Whamcloud - gitweb
0d98af351bbec4b53ceabc4ef4e0c663d20ef2e3
[fs/lustre-release.git] / lustre / lov / lov_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2004, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_LOV
41
42 #ifdef __KERNEL__
43 #include <libcfs/libcfs.h>
44 #else
45 #include <liblustre.h>
46 #endif
47
48 #include <obd_class.h>
49 #include <obd_lov.h>
50 #include <lustre/lustre_idl.h>
51
52 #include "lov_internal.h"
53
54 static void lov_init_set(struct lov_request_set *set)
55 {
56         set->set_count = 0;
57         set->set_completes = 0;
58         set->set_success = 0;
59         set->set_cookies = 0;
60         CFS_INIT_LIST_HEAD(&set->set_list);
61         atomic_set(&set->set_refcount, 1);
62         cfs_waitq_init(&set->set_waitq);
63         spin_lock_init(&set->set_lock);
64 }
65
66 void lov_finish_set(struct lov_request_set *set)
67 {
68         struct list_head *pos, *n;
69         ENTRY;
70
71         LASSERT(set);
72         list_for_each_safe(pos, n, &set->set_list) {
73                 struct lov_request *req = list_entry(pos, struct lov_request,
74                                                      rq_link);
75                 list_del_init(&req->rq_link);
76
77                 if (req->rq_oi.oi_oa)
78                         OBDO_FREE(req->rq_oi.oi_oa);
79                 if (req->rq_oi.oi_md)
80                         OBD_FREE(req->rq_oi.oi_md, req->rq_buflen);
81                 if (req->rq_oi.oi_osfs)
82                         OBD_FREE(req->rq_oi.oi_osfs,
83                                  sizeof(*req->rq_oi.oi_osfs));
84                 OBD_FREE(req, sizeof(*req));
85         }
86
87         if (set->set_pga) {
88                 int len = set->set_oabufs * sizeof(*set->set_pga);
89                 OBD_FREE(set->set_pga, len);
90         }
91         if (set->set_lockh)
92                 lov_llh_put(set->set_lockh);
93
94         OBD_FREE(set, sizeof(*set));
95         EXIT;
96 }
97
98 int lov_finished_set(struct lov_request_set *set)
99 {
100         CDEBUG(D_INFO, "check set %d/%d\n", set->set_completes,
101                set->set_count);
102         return set->set_completes == set->set_count;
103 }
104
105
106 void lov_update_set(struct lov_request_set *set,
107                     struct lov_request *req, int rc)
108 {
109         req->rq_complete = 1;
110         req->rq_rc = rc;
111
112         set->set_completes++;
113         if (rc == 0)
114                 set->set_success++;
115
116         cfs_waitq_signal(&set->set_waitq);
117 }
118
119 int lov_update_common_set(struct lov_request_set *set,
120                           struct lov_request *req, int rc)
121 {
122         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
123         ENTRY;
124
125         lov_update_set(set, req, rc);
126
127         /* grace error on inactive ost */
128         if (rc && !(lov->lov_tgts[req->rq_idx] &&
129                     lov->lov_tgts[req->rq_idx]->ltd_active))
130                 rc = 0;
131
132         /* FIXME in raid1 regime, should return 0 */
133         RETURN(rc);
134 }
135
136 void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
137 {
138         list_add_tail(&req->rq_link, &set->set_list);
139         set->set_count++;
140         req->rq_rqset = set;
141 }
142
143 int lov_update_enqueue_set(struct lov_request *req, __u32 mode, int rc)
144 {
145         struct lov_request_set *set = req->rq_rqset;
146         struct lustre_handle *lov_lockhp;
147         struct lov_oinfo *loi;
148         ENTRY;
149
150         LASSERT(set != NULL);
151         LASSERT(set->set_oi != NULL);
152
153         lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
154         loi = set->set_oi->oi_md->lsm_oinfo[req->rq_stripe];
155
156         /* XXX LOV STACKING: OSC gets a copy, created in lov_prep_enqueue_set
157          * and that copy can be arbitrarily out of date.
158          *
159          * The LOV API is due for a serious rewriting anyways, and this
160          * can be addressed then. */
161
162         if (rc == ELDLM_OK) {
163                 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
164                 __u64 tmp;
165
166                 LASSERT(lock != NULL);
167                 lov_stripe_lock(set->set_oi->oi_md);
168                 loi->loi_lvb = req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb;
169                 tmp = loi->loi_lvb.lvb_size;
170                 /* Extend KMS up to the end of this lock and no further
171                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
172                 if (tmp > lock->l_policy_data.l_extent.end)
173                         tmp = lock->l_policy_data.l_extent.end + 1;
174                 if (tmp >= loi->loi_kms) {
175                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
176                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
177                         loi->loi_kms = tmp;
178                         loi->loi_kms_valid = 1;
179                 } else {
180                         LDLM_DEBUG(lock, "lock acquired, setting rss="
181                                    LPU64"; leaving kms="LPU64", end="LPU64,
182                                    loi->loi_lvb.lvb_size, loi->loi_kms,
183                                    lock->l_policy_data.l_extent.end);
184                 }
185                 lov_stripe_unlock(set->set_oi->oi_md);
186                 ldlm_lock_allow_match(lock);
187                 LDLM_LOCK_PUT(lock);
188         } else if ((rc == ELDLM_LOCK_ABORTED) &&
189                    (set->set_oi->oi_flags & LDLM_FL_HAS_INTENT)) {
190                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
191                 lov_stripe_lock(set->set_oi->oi_md);
192                 loi->loi_lvb = req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb;
193                 lov_stripe_unlock(set->set_oi->oi_md);
194                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
195                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
196                 rc = ELDLM_OK;
197         } else {
198                 struct obd_export *exp = set->set_exp;
199                 struct lov_obd *lov = &exp->exp_obd->u.lov;
200
201                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
202                 if (lov->lov_tgts[req->rq_idx] &&
203                     lov->lov_tgts[req->rq_idx]->ltd_active) {
204                         /* -EUSERS used by OST to report file contention */
205                         if (rc != -EINTR && rc != -EUSERS)
206                                 CERROR("enqueue objid "LPX64" subobj "
207                                        LPX64" on OST idx %d: rc %d\n",
208                                        set->set_oi->oi_md->lsm_object_id,
209                                        loi->loi_id, loi->loi_ost_idx, rc);
210                 } else {
211                         rc = ELDLM_OK;
212                 }
213         }
214         lov_update_set(set, req, rc);
215         RETURN(rc);
216 }
217
218 /* The callback for osc_enqueue that updates lov info for every OSC request. */
219 static int cb_update_enqueue(struct obd_info *oinfo, int rc)
220 {
221         struct ldlm_enqueue_info *einfo;
222         struct lov_request *lovreq;
223
224         lovreq = container_of(oinfo, struct lov_request, rq_oi);
225         einfo = lovreq->rq_rqset->set_ei;
226         return lov_update_enqueue_set(lovreq, einfo->ei_mode, rc);
227 }
228
229 static int enqueue_done(struct lov_request_set *set, __u32 mode)
230 {
231         struct lov_request *req;
232         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
233         int rc = 0;
234         ENTRY;
235
236         /* enqueue/match success, just return */
237         if (set->set_completes && set->set_completes == set->set_success)
238                 RETURN(0);
239
240         /* cancel enqueued/matched locks */
241         list_for_each_entry(req, &set->set_list, rq_link) {
242                 struct lustre_handle *lov_lockhp;
243
244                 if (!req->rq_complete || req->rq_rc)
245                         continue;
246
247                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
248                 LASSERT(lov_lockhp);
249                 if (!lustre_handle_is_used(lov_lockhp))
250                         continue;
251
252                 rc = obd_cancel(lov->lov_tgts[req->rq_idx]->ltd_exp,
253                                 req->rq_oi.oi_md, mode, lov_lockhp, 0, 0);
254                 if (rc && lov->lov_tgts[req->rq_idx] &&
255                     lov->lov_tgts[req->rq_idx]->ltd_active)
256                         CERROR("cancelling obdjid "LPX64" on OST "
257                                "idx %d error: rc = %d\n",
258                                req->rq_oi.oi_md->lsm_object_id,
259                                req->rq_idx, rc);
260         }
261         if (set->set_lockh)
262                 lov_llh_put(set->set_lockh);
263         RETURN(rc);
264 }
265
266 int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc,
267                          struct ptlrpc_request_set *rqset)
268 {
269         int ret = 0;
270         ENTRY;
271
272         if (set == NULL)
273                 RETURN(0);
274         LASSERT(set->set_exp);
275         /* Do enqueue_done only for sync requests and if any request
276          * succeeded. */
277         if (!rqset) {
278                 if (rc)
279                         set->set_completes = 0;
280                 ret = enqueue_done(set, mode);
281         } else if (set->set_lockh)
282                 lov_llh_put(set->set_lockh);
283
284         lov_put_reqset(set);
285
286         RETURN(rc ? rc : ret);
287 }
288
289 int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo,
290                          struct ldlm_enqueue_info *einfo,
291                          struct lov_request_set **reqset)
292 {
293         struct lov_obd *lov = &exp->exp_obd->u.lov;
294         struct lov_request_set *set;
295         int i, rc = 0;
296         struct lov_oinfo *loi;
297         ENTRY;
298
299         OBD_ALLOC(set, sizeof(*set));
300         if (set == NULL)
301                 RETURN(-ENOMEM);
302         lov_init_set(set);
303
304         set->set_exp = exp;
305         set->set_oi = oinfo;
306         set->set_ei = einfo;
307         set->set_lockh = lov_llh_new(oinfo->oi_md);
308         if (set->set_lockh == NULL)
309                 GOTO(out_set, rc = -ENOMEM);
310         oinfo->oi_lockh->cookie = set->set_lockh->llh_handle.h_cookie;
311
312         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
313                 struct lov_request *req;
314                 obd_off start, end;
315
316                 loi = oinfo->oi_md->lsm_oinfo[i];
317                 if (!lov_stripe_intersects(oinfo->oi_md, i,
318                                            oinfo->oi_policy.l_extent.start,
319                                            oinfo->oi_policy.l_extent.end,
320                                            &start, &end))
321                         continue;
322
323                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
324                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
325                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
326                         continue;
327                 }
328
329                 OBD_ALLOC(req, sizeof(*req));
330                 if (req == NULL)
331                         GOTO(out_set, rc = -ENOMEM);
332
333                 req->rq_buflen = sizeof(*req->rq_oi.oi_md) +
334                         sizeof(struct lov_oinfo *) +
335                         sizeof(struct lov_oinfo);
336                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
337                 if (req->rq_oi.oi_md == NULL) {
338                         OBD_FREE(req, sizeof(*req));
339                         GOTO(out_set, rc = -ENOMEM);
340                 }
341                 req->rq_oi.oi_md->lsm_oinfo[0] =
342                         ((void *)req->rq_oi.oi_md) + sizeof(*req->rq_oi.oi_md) +
343                         sizeof(struct lov_oinfo *);
344
345                 /* Set lov request specific parameters. */
346                 req->rq_oi.oi_lockh = set->set_lockh->llh_handles + i;
347                 req->rq_oi.oi_cb_up = cb_update_enqueue;
348                 req->rq_oi.oi_flags = oinfo->oi_flags;
349
350                 LASSERT(req->rq_oi.oi_lockh);
351
352                 req->rq_oi.oi_policy.l_extent.gid =
353                         oinfo->oi_policy.l_extent.gid;
354                 req->rq_oi.oi_policy.l_extent.start = start;
355                 req->rq_oi.oi_policy.l_extent.end = end;
356
357                 req->rq_idx = loi->loi_ost_idx;
358                 req->rq_stripe = i;
359
360                 /* XXX LOV STACKING: submd should be from the subobj */
361                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
362                 req->rq_oi.oi_md->lsm_object_gr = loi->loi_gr;
363                 req->rq_oi.oi_md->lsm_stripe_count = 0;
364                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms_valid =
365                         loi->loi_kms_valid;
366                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms = loi->loi_kms;
367                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb = loi->loi_lvb;
368
369                 lov_set_add_req(req, set);
370         }
371         if (!set->set_count)
372                 GOTO(out_set, rc = -EIO);
373         *reqset = set;
374         RETURN(0);
375 out_set:
376         lov_fini_enqueue_set(set, einfo->ei_mode, rc, NULL);
377         RETURN(rc);
378 }
379
380 int lov_update_match_set(struct lov_request_set *set, struct lov_request *req,
381                          int rc)
382 {
383         int ret = rc;
384         ENTRY;
385
386         if (rc > 0)
387                 ret = 0;
388         else if (rc == 0)
389                 ret = 1;
390         lov_update_set(set, req, ret);
391         RETURN(rc);
392 }
393
394 int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
395 {
396         int rc = 0;
397         ENTRY;
398
399         if (set == NULL)
400                 RETURN(0);
401         LASSERT(set->set_exp);
402         rc = enqueue_done(set, mode);
403         if ((set->set_count == set->set_success) &&
404             (flags & LDLM_FL_TEST_LOCK))
405                 lov_llh_put(set->set_lockh);
406
407         lov_put_reqset(set);
408
409         RETURN(rc);
410 }
411
412 int lov_prep_match_set(struct obd_export *exp, struct obd_info *oinfo,
413                        struct lov_stripe_md *lsm, ldlm_policy_data_t *policy,
414                        __u32 mode, struct lustre_handle *lockh,
415                        struct lov_request_set **reqset)
416 {
417         struct lov_obd *lov = &exp->exp_obd->u.lov;
418         struct lov_request_set *set;
419         int i, rc = 0;
420         struct lov_oinfo *loi;
421         ENTRY;
422
423         OBD_ALLOC(set, sizeof(*set));
424         if (set == NULL)
425                 RETURN(-ENOMEM);
426         lov_init_set(set);
427
428         set->set_exp = exp;
429         set->set_oi = oinfo;
430         set->set_oi->oi_md = lsm;
431         set->set_lockh = lov_llh_new(lsm);
432         if (set->set_lockh == NULL)
433                 GOTO(out_set, rc = -ENOMEM);
434         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
435
436         for (i = 0; i < lsm->lsm_stripe_count; i++){
437                 struct lov_request *req;
438                 obd_off start, end;
439
440                 loi = lsm->lsm_oinfo[i];
441                 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
442                                            policy->l_extent.end, &start, &end))
443                         continue;
444
445                 /* FIXME raid1 should grace this error */
446                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
447                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
448                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
449                         GOTO(out_set, rc = -EIO);
450                 }
451
452                 OBD_ALLOC(req, sizeof(*req));
453                 if (req == NULL)
454                         GOTO(out_set, rc = -ENOMEM);
455
456                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
457                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
458                 if (req->rq_oi.oi_md == NULL) {
459                         OBD_FREE(req, sizeof(*req));
460                         GOTO(out_set, rc = -ENOMEM);
461                 }
462
463                 req->rq_oi.oi_policy.l_extent.start = start;
464                 req->rq_oi.oi_policy.l_extent.end = end;
465                 req->rq_oi.oi_policy.l_extent.gid = policy->l_extent.gid;
466
467                 req->rq_idx = loi->loi_ost_idx;
468                 req->rq_stripe = i;
469
470                 /* XXX LOV STACKING: submd should be from the subobj */
471                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
472                 req->rq_oi.oi_md->lsm_object_gr = loi->loi_gr;
473                 req->rq_oi.oi_md->lsm_stripe_count = 0;
474
475                 lov_set_add_req(req, set);
476         }
477         if (!set->set_count)
478                 GOTO(out_set, rc = -EIO);
479         *reqset = set;
480         RETURN(rc);
481 out_set:
482         lov_fini_match_set(set, mode, 0);
483         RETURN(rc);
484 }
485
486 int lov_fini_cancel_set(struct lov_request_set *set)
487 {
488         int rc = 0;
489         ENTRY;
490
491         if (set == NULL)
492                 RETURN(0);
493
494         LASSERT(set->set_exp);
495         if (set->set_lockh)
496                 lov_llh_put(set->set_lockh);
497
498         lov_put_reqset(set);
499
500         RETURN(rc);
501 }
502
503 int lov_prep_cancel_set(struct obd_export *exp, struct obd_info *oinfo,
504                         struct lov_stripe_md *lsm, __u32 mode,
505                         struct lustre_handle *lockh,
506                         struct lov_request_set **reqset)
507 {
508         struct lov_request_set *set;
509         int i, rc = 0;
510         struct lov_oinfo *loi;
511         ENTRY;
512
513         OBD_ALLOC(set, sizeof(*set));
514         if (set == NULL)
515                 RETURN(-ENOMEM);
516         lov_init_set(set);
517
518         set->set_exp = exp;
519         set->set_oi = oinfo;
520         set->set_oi->oi_md = lsm;
521         set->set_lockh = lov_handle2llh(lockh);
522         if (set->set_lockh == NULL) {
523                 CERROR("LOV: invalid lov lock handle %p\n", lockh);
524                 GOTO(out_set, rc = -EINVAL);
525         }
526         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
527
528         for (i = 0; i < lsm->lsm_stripe_count; i++){
529                 struct lov_request *req;
530                 struct lustre_handle *lov_lockhp;
531
532                 loi = lsm->lsm_oinfo[i];
533                 lov_lockhp = set->set_lockh->llh_handles + i;
534                 if (!lustre_handle_is_used(lov_lockhp)) {
535                         CDEBUG(D_INFO, "lov idx %d subobj "LPX64" no lock\n",
536                                loi->loi_ost_idx, loi->loi_id);
537                         continue;
538                 }
539
540                 OBD_ALLOC(req, sizeof(*req));
541                 if (req == NULL)
542                         GOTO(out_set, rc = -ENOMEM);
543
544                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
545                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
546                 if (req->rq_oi.oi_md == NULL) {
547                         OBD_FREE(req, sizeof(*req));
548                         GOTO(out_set, rc = -ENOMEM);
549                 }
550
551                 req->rq_idx = loi->loi_ost_idx;
552                 req->rq_stripe = i;
553
554                 /* XXX LOV STACKING: submd should be from the subobj */
555                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
556                 req->rq_oi.oi_md->lsm_stripe_count = 0;
557
558                 lov_set_add_req(req, set);
559         }
560         if (!set->set_count)
561                 GOTO(out_set, rc = -EIO);
562         *reqset = set;
563         RETURN(rc);
564 out_set:
565         lov_fini_cancel_set(set);
566         RETURN(rc);
567 }
568
569 static int create_done(struct obd_export *exp, struct lov_request_set *set,
570                        struct lov_stripe_md **lsmp)
571 {
572         struct lov_obd *lov = &exp->exp_obd->u.lov;
573         struct obd_trans_info *oti = set->set_oti;
574         struct obdo *src_oa = set->set_oi->oi_oa;
575         struct lov_request *req;
576         struct obdo *ret_oa = NULL;
577         int attrset = 0, rc = 0;
578         ENTRY;
579
580         LASSERT(set->set_completes);
581
582         /* try alloc objects on other osts if osc_create fails for
583          * exceptions: RPC failure, ENOSPC, etc */
584         if (set->set_count != set->set_success) {
585                 list_for_each_entry (req, &set->set_list, rq_link) {
586                         if (req->rq_rc == 0)
587                                 continue;
588
589                         set->set_completes--;
590                         req->rq_complete = 0;
591
592                         rc = qos_remedy_create(set, req);
593                         lov_update_create_set(set, req, rc);
594                 }
595         }
596
597         /* no successful creates */
598         if (set->set_success == 0)
599                 GOTO(cleanup, rc);
600
601         if (set->set_count != set->set_success) {
602                 set->set_count = set->set_success;
603                 qos_shrink_lsm(set);
604         }
605
606         OBDO_ALLOC(ret_oa);
607         if (ret_oa == NULL)
608                 GOTO(cleanup, rc = -ENOMEM);
609
610         list_for_each_entry(req, &set->set_list, rq_link) {
611                 if (!req->rq_complete || req->rq_rc)
612                         continue;
613                 lov_merge_attrs(ret_oa, req->rq_oi.oi_oa,
614                                 req->rq_oi.oi_oa->o_valid, set->set_oi->oi_md,
615                                 req->rq_stripe, &attrset);
616         }
617         if (src_oa->o_valid & OBD_MD_FLSIZE &&
618             ret_oa->o_size != src_oa->o_size) {
619                 CERROR("original size "LPU64" isn't new object size "LPU64"\n",
620                        src_oa->o_size, ret_oa->o_size);
621                 LBUG();
622         }
623         ret_oa->o_id = src_oa->o_id;
624         ret_oa->o_gr = src_oa->o_gr;
625         ret_oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP;
626         memcpy(src_oa, ret_oa, sizeof(*src_oa));
627         OBDO_FREE(ret_oa);
628
629         *lsmp = set->set_oi->oi_md;
630         GOTO(done, rc = 0);
631
632 cleanup:
633         list_for_each_entry(req, &set->set_list, rq_link) {
634                 struct obd_export *sub_exp;
635                 int err = 0;
636
637                 if (!req->rq_complete || req->rq_rc)
638                         continue;
639
640                 sub_exp = lov->lov_tgts[req->rq_idx]->ltd_exp;
641                 err = obd_destroy(sub_exp, req->rq_oi.oi_oa, NULL, oti, NULL);
642                 if (err)
643                         CERROR("Failed to uncreate objid "LPX64" subobj "
644                                LPX64" on OST idx %d: rc = %d\n",
645                                src_oa->o_id, req->rq_oi.oi_oa->o_id,
646                                req->rq_idx, rc);
647         }
648         if (*lsmp == NULL)
649                 obd_free_memmd(exp, &set->set_oi->oi_md);
650 done:
651         if (oti && set->set_cookies) {
652                 oti->oti_logcookies = set->set_cookies;
653                 if (!set->set_cookie_sent) {
654                         oti_free_cookies(oti);
655                         src_oa->o_valid &= ~OBD_MD_FLCOOKIE;
656                 } else {
657                         src_oa->o_valid |= OBD_MD_FLCOOKIE;
658                 }
659         }
660         RETURN(rc);
661 }
662
663 int lov_fini_create_set(struct lov_request_set *set,struct lov_stripe_md **lsmp)
664 {
665         int rc = 0;
666         ENTRY;
667
668         if (set == NULL)
669                 RETURN(0);
670         LASSERT(set->set_exp);
671         if (set->set_completes)
672                 rc = create_done(set->set_exp, set, lsmp);
673
674         lov_put_reqset(set);
675         RETURN(rc);
676 }
677
678 int lov_update_create_set(struct lov_request_set *set,
679                           struct lov_request *req, int rc)
680 {
681         struct obd_trans_info *oti = set->set_oti;
682         struct lov_stripe_md *lsm = set->set_oi->oi_md;
683         struct lov_oinfo *loi;
684         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
685         ENTRY;
686
687         if (rc && lov->lov_tgts[req->rq_idx] &&
688             lov->lov_tgts[req->rq_idx]->ltd_active) {
689                 CERROR("error creating fid "LPX64" sub-object"
690                        " on OST idx %d/%d: rc = %d\n",
691                        set->set_oi->oi_oa->o_id, req->rq_idx,
692                        lsm->lsm_stripe_count, rc);
693                 if (rc > 0) {
694                         CERROR("obd_create returned invalid err %d\n", rc);
695                         rc = -EIO;
696                 }
697         }
698
699         spin_lock(&set->set_lock);
700         req->rq_stripe = set->set_success;
701         loi = lsm->lsm_oinfo[req->rq_stripe];
702         if (rc) {
703                 lov_update_set(set, req, rc);
704                 spin_unlock(&set->set_lock);
705                 RETURN(rc);
706         }
707
708         loi->loi_id = req->rq_oi.oi_oa->o_id;
709         loi->loi_gr = req->rq_oi.oi_oa->o_gr;
710         loi->loi_ost_idx = req->rq_idx;
711         loi_init(loi);
712
713         if (oti && set->set_cookies)
714                 ++oti->oti_logcookies;
715         if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCOOKIE)
716                 set->set_cookie_sent++;
717
718         lov_update_set(set, req, rc);
719         spin_unlock(&set->set_lock);
720
721         CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPU64" at idx %d\n",
722                lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);
723
724         RETURN(0);
725 }
726
727 int cb_create_update(struct obd_info *oinfo, int rc)
728 {
729         struct lov_request *lovreq;
730
731         lovreq = container_of(oinfo, struct lov_request, rq_oi);
732         rc= lov_update_create_set(lovreq->rq_rqset, lovreq, rc);
733         if (lov_finished_set(lovreq->rq_rqset))
734                 lov_put_reqset(lovreq->rq_rqset);
735         return rc;
736 }
737
738
739 int lov_prep_create_set(struct obd_export *exp, struct obd_info *oinfo,
740                         struct lov_stripe_md **lsmp, struct obdo *src_oa,
741                         struct obd_trans_info *oti,
742                         struct lov_request_set **reqset)
743 {
744         struct lov_request_set *set;
745         int rc = 0;
746         ENTRY;
747
748         OBD_ALLOC(set, sizeof(*set));
749         if (set == NULL)
750                 RETURN(-ENOMEM);
751         lov_init_set(set);
752
753         set->set_exp = exp;
754         set->set_oi = oinfo;
755         set->set_oi->oi_md = *lsmp;
756         set->set_oi->oi_oa = src_oa;
757         set->set_oti = oti;
758         lov_get_reqset(set);
759
760         rc = qos_prep_create(exp, set);
761         /* qos_shrink_lsm() may have allocated a new lsm */
762         *lsmp = oinfo->oi_md;
763         if (rc) {
764                 lov_fini_create_set(set, lsmp);
765                 lov_put_reqset(set);
766         } else {
767                 *reqset = set;
768         }
769         RETURN(rc);
770 }
771
772 static int common_attr_done(struct lov_request_set *set)
773 {
774         struct list_head *pos;
775         struct lov_request *req;
776         struct obdo *tmp_oa;
777         int rc = 0, attrset = 0;
778         ENTRY;
779
780         LASSERT(set->set_oi != NULL);
781
782         if (set->set_oi->oi_oa == NULL)
783                 RETURN(0);
784
785         if (!set->set_success)
786                 RETURN(-EIO);
787
788         OBDO_ALLOC(tmp_oa);
789         if (tmp_oa == NULL)
790                 GOTO(out, rc = -ENOMEM);
791
792         list_for_each (pos, &set->set_list) {
793                 req = list_entry(pos, struct lov_request, rq_link);
794
795                 if (!req->rq_complete || req->rq_rc)
796                         continue;
797                 if (req->rq_oi.oi_oa->o_valid == 0)   /* inactive stripe */
798                         continue;
799                 lov_merge_attrs(tmp_oa, req->rq_oi.oi_oa,
800                                 req->rq_oi.oi_oa->o_valid,
801                                 set->set_oi->oi_md, req->rq_stripe, &attrset);
802         }
803         if (!attrset) {
804                 CERROR("No stripes had valid attrs\n");
805                 rc = -EIO;
806         }
807         tmp_oa->o_id = set->set_oi->oi_oa->o_id;
808         tmp_oa->o_gr = set->set_oi->oi_oa->o_gr;
809         memcpy(set->set_oi->oi_oa, tmp_oa, sizeof(*set->set_oi->oi_oa));
810 out:
811         if (tmp_oa)
812                 OBDO_FREE(tmp_oa);
813         RETURN(rc);
814
815 }
816
817 static int brw_done(struct lov_request_set *set)
818 {
819         struct lov_stripe_md *lsm = set->set_oi->oi_md;
820         struct lov_oinfo     *loi = NULL;
821         struct list_head *pos;
822         struct lov_request *req;
823         ENTRY;
824
825         list_for_each (pos, &set->set_list) {
826                 req = list_entry(pos, struct lov_request, rq_link);
827
828                 if (!req->rq_complete || req->rq_rc)
829                         continue;
830
831                 loi = lsm->lsm_oinfo[req->rq_stripe];
832
833                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS)
834                         loi->loi_lvb.lvb_blocks = req->rq_oi.oi_oa->o_blocks;
835         }
836
837         RETURN(0);
838 }
839
840 int lov_fini_brw_set(struct lov_request_set *set)
841 {
842         int rc = 0;
843         ENTRY;
844
845         if (set == NULL)
846                 RETURN(0);
847         LASSERT(set->set_exp);
848         if (set->set_completes) {
849                 rc = brw_done(set);
850                 /* FIXME update qos data here */
851         }
852         lov_put_reqset(set);
853
854         RETURN(rc);
855 }
856
857 int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo,
858                      obd_count oa_bufs, struct brw_page *pga,
859                      struct obd_trans_info *oti,
860                      struct lov_request_set **reqset)
861 {
862         struct {
863                 obd_count       index;
864                 obd_count       count;
865                 obd_count       off;
866         } *info = NULL;
867         struct lov_request_set *set;
868         struct lov_oinfo *loi = NULL;
869         struct lov_obd *lov = &exp->exp_obd->u.lov;
870         int rc = 0, i, shift;
871         ENTRY;
872
873         OBD_ALLOC(set, sizeof(*set));
874         if (set == NULL)
875                 RETURN(-ENOMEM);
876         lov_init_set(set);
877
878         set->set_exp = exp;
879         set->set_oti = oti;
880         set->set_oi = oinfo;
881         set->set_oabufs = oa_bufs;
882         OBD_ALLOC(set->set_pga, oa_bufs * sizeof(*set->set_pga));
883         if (!set->set_pga)
884                 GOTO(out, rc = -ENOMEM);
885
886         OBD_ALLOC(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
887         if (!info)
888                 GOTO(out, rc = -ENOMEM);
889
890         /* calculate the page count for each stripe */
891         for (i = 0; i < oa_bufs; i++) {
892                 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
893                 info[stripe].count++;
894         }
895
896         /* alloc and initialize lov request */
897         shift = 0;
898         for (i = 0 ; i < oinfo->oi_md->lsm_stripe_count; i++){
899                 struct lov_request *req;
900
901                 if (info[i].count == 0)
902                         continue;
903
904                 loi = oinfo->oi_md->lsm_oinfo[i];
905                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
906                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
907                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
908                         GOTO(out, rc = -EIO);
909                 }
910
911                 OBD_ALLOC(req, sizeof(*req));
912                 if (req == NULL)
913                         GOTO(out, rc = -ENOMEM);
914
915                 OBDO_ALLOC(req->rq_oi.oi_oa);
916                 if (req->rq_oi.oi_oa == NULL) {
917                         OBD_FREE(req, sizeof(*req));
918                         GOTO(out, rc = -ENOMEM);
919                 }
920
921                 if (oinfo->oi_oa) {
922                         memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
923                                sizeof(*req->rq_oi.oi_oa));
924                 }
925                 req->rq_oi.oi_oa->o_id = loi->loi_id;
926                 req->rq_oi.oi_oa->o_gr = loi->loi_gr;
927                 req->rq_oi.oi_oa->o_stripe_idx = i;
928
929                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
930                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
931                 if (req->rq_oi.oi_md == NULL) {
932                         OBDO_FREE(req->rq_oi.oi_oa);
933                         OBD_FREE(req, sizeof(*req));
934                         GOTO(out, rc = -ENOMEM);
935                 }
936
937                 req->rq_idx = loi->loi_ost_idx;
938                 req->rq_stripe = i;
939
940                 /* XXX LOV STACKING */
941                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
942                 req->rq_oi.oi_md->lsm_object_gr = oinfo->oi_md->lsm_object_gr;
943                 req->rq_oabufs = info[i].count;
944                 req->rq_pgaidx = shift;
945                 shift += req->rq_oabufs;
946
947                 /* remember the index for sort brw_page array */
948                 info[i].index = req->rq_pgaidx;
949
950                 lov_set_add_req(req, set);
951         }
952         if (!set->set_count)
953                 GOTO(out, rc = -EIO);
954
955         /* rotate & sort the brw_page array */
956         for (i = 0; i < oa_bufs; i++) {
957                 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
958
959                 shift = info[stripe].index + info[stripe].off;
960                 LASSERT(shift < oa_bufs);
961                 set->set_pga[shift] = pga[i];
962                 lov_stripe_offset(oinfo->oi_md, pga[i].off, stripe,
963                                   &set->set_pga[shift].off);
964                 info[stripe].off++;
965         }
966 out:
967         if (info)
968                 OBD_FREE(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
969
970         if (rc == 0)
971                 *reqset = set;
972         else
973                 lov_fini_brw_set(set);
974
975         RETURN(rc);
976 }
977
978 int lov_fini_getattr_set(struct lov_request_set *set)
979 {
980         int rc = 0;
981         ENTRY;
982
983         if (set == NULL)
984                 RETURN(0);
985         LASSERT(set->set_exp);
986         if (set->set_completes)
987                 rc = common_attr_done(set);
988
989         lov_put_reqset(set);
990
991         RETURN(rc);
992 }
993
994 /* The callback for osc_getattr_async that finilizes a request info when a
995  * response is recieved. */
996 static int cb_getattr_update(struct obd_info *oinfo, int rc)
997 {
998         struct lov_request *lovreq;
999         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1000         return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
1001 }
1002
1003 int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo,
1004                          struct lov_request_set **reqset)
1005 {
1006         struct lov_request_set *set;
1007         struct lov_oinfo *loi = NULL;
1008         struct lov_obd *lov = &exp->exp_obd->u.lov;
1009         int rc = 0, i;
1010         ENTRY;
1011
1012         OBD_ALLOC(set, sizeof(*set));
1013         if (set == NULL)
1014                 RETURN(-ENOMEM);
1015         lov_init_set(set);
1016
1017         set->set_exp = exp;
1018         set->set_oi = oinfo;
1019
1020         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1021                 struct lov_request *req;
1022
1023                 loi = oinfo->oi_md->lsm_oinfo[i];
1024                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1025                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1026                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1027                         continue;
1028                 }
1029
1030                 OBD_ALLOC(req, sizeof(*req));
1031                 if (req == NULL)
1032                         GOTO(out_set, rc = -ENOMEM);
1033
1034                 req->rq_stripe = i;
1035                 req->rq_idx = loi->loi_ost_idx;
1036
1037                 OBDO_ALLOC(req->rq_oi.oi_oa);
1038                 if (req->rq_oi.oi_oa == NULL) {
1039                         OBD_FREE(req, sizeof(*req));
1040                         GOTO(out_set, rc = -ENOMEM);
1041                 }
1042                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1043                        sizeof(*req->rq_oi.oi_oa));
1044                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1045                 req->rq_oi.oi_oa->o_gr = loi->loi_gr;
1046                 req->rq_oi.oi_cb_up = cb_getattr_update;
1047
1048                 lov_set_add_req(req, set);
1049         }
1050         if (!set->set_count)
1051                 GOTO(out_set, rc = -EIO);
1052         *reqset = set;
1053         RETURN(rc);
1054 out_set:
1055         lov_fini_getattr_set(set);
1056         RETURN(rc);
1057 }
1058
1059 int lov_fini_destroy_set(struct lov_request_set *set)
1060 {
1061         ENTRY;
1062
1063         if (set == NULL)
1064                 RETURN(0);
1065         LASSERT(set->set_exp);
1066         if (set->set_completes) {
1067                 /* FIXME update qos data here */
1068         }
1069
1070         lov_put_reqset(set);
1071
1072         RETURN(0);
1073 }
1074
1075 int lov_prep_destroy_set(struct obd_export *exp, struct obd_info *oinfo,
1076                          struct obdo *src_oa, struct lov_stripe_md *lsm,
1077                          struct obd_trans_info *oti,
1078                          struct lov_request_set **reqset)
1079 {
1080         struct lov_request_set *set;
1081         struct lov_oinfo *loi = NULL;
1082         struct lov_obd *lov = &exp->exp_obd->u.lov;
1083         int rc = 0, i;
1084         ENTRY;
1085
1086         OBD_ALLOC(set, sizeof(*set));
1087         if (set == NULL)
1088                 RETURN(-ENOMEM);
1089         lov_init_set(set);
1090
1091         set->set_exp = exp;
1092         set->set_oi = oinfo;
1093         set->set_oi->oi_md = lsm;
1094         set->set_oi->oi_oa = src_oa;
1095         set->set_oti = oti;
1096         if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE)
1097                 set->set_cookies = oti->oti_logcookies;
1098
1099         for (i = 0; i < lsm->lsm_stripe_count; i++) {
1100                 struct lov_request *req;
1101
1102                 loi = lsm->lsm_oinfo[i];
1103                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1104                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1105                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1106                         continue;
1107                 }
1108
1109                 OBD_ALLOC(req, sizeof(*req));
1110                 if (req == NULL)
1111                         GOTO(out_set, rc = -ENOMEM);
1112
1113                 req->rq_stripe = i;
1114                 req->rq_idx = loi->loi_ost_idx;
1115
1116                 OBDO_ALLOC(req->rq_oi.oi_oa);
1117                 if (req->rq_oi.oi_oa == NULL) {
1118                         OBD_FREE(req, sizeof(*req));
1119                         GOTO(out_set, rc = -ENOMEM);
1120                 }
1121                 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
1122                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1123                 req->rq_oi.oi_oa->o_gr = loi->loi_gr;
1124                 lov_set_add_req(req, set);
1125         }
1126         if (!set->set_count)
1127                 GOTO(out_set, rc = -EIO);
1128         *reqset = set;
1129         RETURN(rc);
1130 out_set:
1131         lov_fini_destroy_set(set);
1132         RETURN(rc);
1133 }
1134
1135 int lov_fini_setattr_set(struct lov_request_set *set)
1136 {
1137         int rc = 0;
1138         ENTRY;
1139
1140         if (set == NULL)
1141                 RETURN(0);
1142         LASSERT(set->set_exp);
1143         if (set->set_completes) {
1144                 rc = common_attr_done(set);
1145                 /* FIXME update qos data here */
1146         }
1147
1148         lov_put_reqset(set);
1149         RETURN(rc);
1150 }
1151
1152 int lov_update_setattr_set(struct lov_request_set *set,
1153                            struct lov_request *req, int rc)
1154 {
1155         struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1156         struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1157         ENTRY;
1158
1159         lov_update_set(set, req, rc);
1160
1161         /* grace error on inactive ost */
1162         if (rc && !(lov->lov_tgts[req->rq_idx] &&
1163                     lov->lov_tgts[req->rq_idx]->ltd_active))
1164                 rc = 0;
1165
1166         if (rc == 0) {
1167                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCTIME)
1168                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_ctime =
1169                                 req->rq_oi.oi_oa->o_ctime;
1170                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLMTIME)
1171                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_mtime =
1172                                 req->rq_oi.oi_oa->o_mtime;
1173                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLATIME)
1174                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_atime =
1175                                 req->rq_oi.oi_oa->o_atime;
1176         }
1177
1178         RETURN(rc);
1179 }
1180
1181 /* The callback for osc_setattr_async that finilizes a request info when a
1182  * response is recieved. */
1183 static int cb_setattr_update(struct obd_info *oinfo, int rc)
1184 {
1185         struct lov_request *lovreq;
1186         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1187         return lov_update_setattr_set(lovreq->rq_rqset, lovreq, rc);
1188 }
1189
1190 int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo,
1191                          struct obd_trans_info *oti,
1192                          struct lov_request_set **reqset)
1193 {
1194         struct lov_request_set *set;
1195         struct lov_oinfo *loi = NULL;
1196         struct lov_obd *lov = &exp->exp_obd->u.lov;
1197         int rc = 0, i;
1198         ENTRY;
1199
1200         OBD_ALLOC(set, sizeof(*set));
1201         if (set == NULL)
1202                 RETURN(-ENOMEM);
1203         lov_init_set(set);
1204
1205         set->set_exp = exp;
1206         set->set_oti = oti;
1207         set->set_oi = oinfo;
1208         if (oti != NULL && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
1209                 set->set_cookies = oti->oti_logcookies;
1210
1211         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1212                 struct lov_request *req;
1213
1214                 loi = oinfo->oi_md->lsm_oinfo[i];
1215                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1216                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1217                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1218                         continue;
1219                 }
1220
1221                 OBD_ALLOC(req, sizeof(*req));
1222                 if (req == NULL)
1223                         GOTO(out_set, rc = -ENOMEM);
1224                 req->rq_stripe = i;
1225                 req->rq_idx = loi->loi_ost_idx;
1226
1227                 OBDO_ALLOC(req->rq_oi.oi_oa);
1228                 if (req->rq_oi.oi_oa == NULL) {
1229                         OBD_FREE(req, sizeof(*req));
1230                         GOTO(out_set, rc = -ENOMEM);
1231                 }
1232                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1233                        sizeof(*req->rq_oi.oi_oa));
1234                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1235                 req->rq_oi.oi_oa->o_gr = loi->loi_gr;
1236                 req->rq_oi.oi_oa->o_stripe_idx = i;
1237                 req->rq_oi.oi_cb_up = cb_setattr_update;
1238
1239                 if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE) {
1240                         int off = lov_stripe_offset(oinfo->oi_md,
1241                                                     oinfo->oi_oa->o_size, i,
1242                                                     &req->rq_oi.oi_oa->o_size);
1243
1244                         if (off < 0 && req->rq_oi.oi_oa->o_size)
1245                                 req->rq_oi.oi_oa->o_size--;
1246
1247                         CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
1248                                i, req->rq_oi.oi_oa->o_size,
1249                                oinfo->oi_oa->o_size);
1250                 }
1251                 lov_set_add_req(req, set);
1252         }
1253         if (!set->set_count)
1254                 GOTO(out_set, rc = -EIO);
1255         *reqset = set;
1256         RETURN(rc);
1257 out_set:
1258         lov_fini_setattr_set(set);
1259         RETURN(rc);
1260 }
1261
1262 int lov_fini_punch_set(struct lov_request_set *set)
1263 {
1264         int rc = 0;
1265         ENTRY;
1266
1267         if (set == NULL)
1268                 RETURN(0);
1269         LASSERT(set->set_exp);
1270         if (set->set_completes) {
1271                 rc = -EIO;
1272                 /* FIXME update qos data here */
1273                 if (set->set_success)
1274                         rc = common_attr_done(set);
1275         }
1276
1277         lov_put_reqset(set);
1278
1279         RETURN(rc);
1280 }
1281
1282 int lov_update_punch_set(struct lov_request_set *set,
1283                            struct lov_request *req, int rc)
1284 {
1285         struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1286         struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1287         ENTRY;
1288
1289         lov_update_set(set, req, rc);
1290
1291         /* grace error on inactive ost */
1292         if (rc && !lov->lov_tgts[req->rq_idx]->ltd_active)
1293                 rc = 0;
1294
1295         if (rc == 0) {
1296                 lov_stripe_lock(lsm);
1297                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS) {
1298                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_blocks =
1299                                 req->rq_oi.oi_oa->o_blocks;
1300                 }
1301
1302                 /* Do we need to update lvb_size here? It needn't because
1303                  * it have been done in ll_truncate(). -jay */
1304                 lov_stripe_unlock(lsm);
1305         }
1306
1307         RETURN(rc);
1308 }
1309
1310 /* The callback for osc_punch that finilizes a request info when a response
1311  * is recieved. */
1312 static int cb_update_punch(struct obd_info *oinfo, int rc)
1313 {
1314         struct lov_request *lovreq;
1315         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1316         return lov_update_punch_set(lovreq->rq_rqset, lovreq, rc);
1317 }
1318
1319 int lov_prep_punch_set(struct obd_export *exp, struct obd_info *oinfo,
1320                        struct obd_trans_info *oti,
1321                        struct lov_request_set **reqset)
1322 {
1323         struct lov_request_set *set;
1324         struct lov_oinfo *loi = NULL;
1325         struct lov_obd *lov = &exp->exp_obd->u.lov;
1326         int rc = 0, i;
1327         ENTRY;
1328
1329         OBD_ALLOC(set, sizeof(*set));
1330         if (set == NULL)
1331                 RETURN(-ENOMEM);
1332         lov_init_set(set);
1333
1334         set->set_oi = oinfo;
1335         set->set_exp = exp;
1336
1337         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1338                 struct lov_request *req;
1339                 obd_off rs, re;
1340
1341                 if (!lov_stripe_intersects(oinfo->oi_md, i,
1342                                            oinfo->oi_policy.l_extent.start,
1343                                            oinfo->oi_policy.l_extent.end,
1344                                            &rs, &re))
1345                         continue;
1346
1347                 loi = oinfo->oi_md->lsm_oinfo[i];
1348                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1349                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1350                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1351                         GOTO(out_set, rc = -EIO);
1352                 }
1353
1354                 OBD_ALLOC(req, sizeof(*req));
1355                 if (req == NULL)
1356                         GOTO(out_set, rc = -ENOMEM);
1357                 req->rq_stripe = i;
1358                 req->rq_idx = loi->loi_ost_idx;
1359
1360                 OBDO_ALLOC(req->rq_oi.oi_oa);
1361                 if (req->rq_oi.oi_oa == NULL) {
1362                         OBD_FREE(req, sizeof(*req));
1363                         GOTO(out_set, rc = -ENOMEM);
1364                 }
1365                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1366                        sizeof(*req->rq_oi.oi_oa));
1367                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1368                 req->rq_oi.oi_oa->o_gr = loi->loi_gr;
1369                 req->rq_oi.oi_oa->o_stripe_idx = i;
1370                 req->rq_oi.oi_cb_up = cb_update_punch;
1371
1372                 req->rq_oi.oi_policy.l_extent.start = rs;
1373                 req->rq_oi.oi_policy.l_extent.end = re;
1374                 req->rq_oi.oi_policy.l_extent.gid = -1;
1375
1376                 lov_set_add_req(req, set);
1377         }
1378         if (!set->set_count)
1379                 GOTO(out_set, rc = -EIO);
1380         *reqset = set;
1381         RETURN(rc);
1382 out_set:
1383         lov_fini_punch_set(set);
1384         RETURN(rc);
1385 }
1386
1387 int lov_fini_sync_set(struct lov_request_set *set)
1388 {
1389         int rc = 0;
1390         ENTRY;
1391
1392         if (set == NULL)
1393                 RETURN(0);
1394         LASSERT(set->set_exp);
1395         if (set->set_completes) {
1396                 if (!set->set_success)
1397                         rc = -EIO;
1398                 /* FIXME update qos data here */
1399         }
1400
1401         lov_put_reqset(set);
1402
1403         RETURN(rc);
1404 }
1405
1406 /* The callback for osc_sync that finilizes a request info when a
1407  * response is recieved. */
1408 static int cb_sync_update(struct obd_info *oinfo, int rc)
1409 {
1410         struct lov_request *lovreq;
1411         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1412         return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
1413 }
1414
1415 int lov_prep_sync_set(struct obd_export *exp, struct obd_info *oinfo,
1416                       obd_off start, obd_off end,
1417                       struct lov_request_set **reqset)
1418 {
1419         struct lov_request_set *set;
1420         struct lov_oinfo *loi = NULL;
1421         struct lov_obd *lov = &exp->exp_obd->u.lov;
1422         int rc = 0, i;
1423         ENTRY;
1424
1425         OBD_ALLOC(set, sizeof(*set));
1426         if (set == NULL)
1427                 RETURN(-ENOMEM);
1428         lov_init_set(set);
1429
1430         set->set_exp = exp;
1431         set->set_oi = oinfo;
1432         set->set_oi->oi_md = oinfo->oi_md;
1433         set->set_oi->oi_oa = oinfo->oi_oa;
1434
1435         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1436                 struct lov_request *req;
1437                 obd_off rs, re;
1438
1439                 loi = oinfo->oi_md->lsm_oinfo[i];
1440                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1441                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1442                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1443                         continue;
1444                 }
1445
1446                 if (!lov_stripe_intersects(oinfo->oi_md, i, start,
1447                                            end, &rs, &re))
1448                         continue;
1449
1450                 OBD_ALLOC(req, sizeof(*req));
1451                 if (req == NULL)
1452                         GOTO(out_set, rc = -ENOMEM);
1453                 req->rq_stripe = i;
1454                 req->rq_idx = loi->loi_ost_idx;
1455
1456                 OBDO_ALLOC(req->rq_oi.oi_oa);
1457                 if (req->rq_oi.oi_oa == NULL) {
1458                         OBD_FREE(req, sizeof(*req));
1459                         GOTO(out_set, rc = -ENOMEM);
1460                 }
1461                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1462                        sizeof(*req->rq_oi.oi_oa));
1463                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1464                 req->rq_oi.oi_oa->o_gr = loi->loi_gr;
1465                 req->rq_oi.oi_oa->o_stripe_idx = i;
1466
1467                 req->rq_oi.oi_policy.l_extent.start = rs;
1468                 req->rq_oi.oi_policy.l_extent.end = re;
1469                 req->rq_oi.oi_policy.l_extent.gid = -1;
1470                 req->rq_oi.oi_cb_up = cb_sync_update;
1471
1472                 lov_set_add_req(req, set);
1473         }
1474         if (!set->set_count)
1475                 GOTO(out_set, rc = -EIO);
1476         *reqset = set;
1477         RETURN(rc);
1478 out_set:
1479         lov_fini_sync_set(set);
1480         RETURN(rc);
1481 }
1482
1483 #define LOV_U64_MAX ((__u64)~0ULL)
1484 #define LOV_SUM_MAX(tot, add)                                           \
1485         do {                                                            \
1486                 if ((tot) + (add) < (tot))                              \
1487                         (tot) = LOV_U64_MAX;                            \
1488                 else                                                    \
1489                         (tot) += (add);                                 \
1490         } while(0)
1491
1492 int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs,int success)
1493 {
1494         ENTRY;
1495
1496         if (success) {
1497                 __u32 expected_stripes = lov_get_stripecnt(&obd->u.lov, 0);
1498
1499                 if (osfs->os_files != LOV_U64_MAX)
1500                         do_div(osfs->os_files, expected_stripes);
1501                 if (osfs->os_ffree != LOV_U64_MAX)
1502                         do_div(osfs->os_ffree, expected_stripes);
1503
1504                 spin_lock(&obd->obd_osfs_lock);
1505                 memcpy(&obd->obd_osfs, osfs, sizeof(*osfs));
1506                 obd->obd_osfs_age = cfs_time_current_64();
1507                 spin_unlock(&obd->obd_osfs_lock);
1508                 RETURN(0);
1509         }
1510
1511         RETURN(-EIO);
1512 }
1513
1514 int lov_fini_statfs_set(struct lov_request_set *set)
1515 {
1516         int rc = 0;
1517         ENTRY;
1518
1519         if (set == NULL)
1520                 RETURN(0);
1521
1522         if (set->set_completes) {
1523                 rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs,
1524                                      set->set_success);
1525         }
1526         lov_put_reqset(set);
1527         RETURN(rc);
1528 }
1529
1530 void lov_update_statfs(struct obd_statfs *osfs, struct obd_statfs *lov_sfs,
1531                        int success)
1532 {
1533         int shift = 0, quit = 0;
1534         __u64 tmp;
1535
1536         if (success == 0) {
1537                 memcpy(osfs, lov_sfs, sizeof(*lov_sfs));
1538         } else {
1539                 if (osfs->os_bsize != lov_sfs->os_bsize) {
1540                         /* assume all block sizes are always powers of 2 */
1541                         /* get the bits difference */
1542                         tmp = osfs->os_bsize | lov_sfs->os_bsize;
1543                         for (shift = 0; shift <= 64; ++shift) {
1544                                 if (tmp & 1) {
1545                                         if (quit)
1546                                                 break;
1547                                         else
1548                                                 quit = 1;
1549                                         shift = 0;
1550                                 }
1551                                 tmp >>= 1;
1552                         }
1553                 }
1554
1555                 if (osfs->os_bsize < lov_sfs->os_bsize) {
1556                         osfs->os_bsize = lov_sfs->os_bsize;
1557
1558                         osfs->os_bfree  >>= shift;
1559                         osfs->os_bavail >>= shift;
1560                         osfs->os_blocks >>= shift;
1561                 } else if (shift != 0) {
1562                         lov_sfs->os_bfree  >>= shift;
1563                         lov_sfs->os_bavail >>= shift;
1564                         lov_sfs->os_blocks >>= shift;
1565                 }
1566 #ifdef MIN_DF
1567                 /* Sandia requested that df (and so, statfs) only
1568                    returned minimal available space on
1569                    a single OST, so people would be able to
1570                    write this much data guaranteed. */
1571                 if (osfs->os_bavail > lov_sfs->os_bavail) {
1572                         /* Presumably if new bavail is smaller,
1573                            new bfree is bigger as well */
1574                         osfs->os_bfree = lov_sfs->os_bfree;
1575                         osfs->os_bavail = lov_sfs->os_bavail;
1576                 }
1577 #else
1578                 osfs->os_bfree += lov_sfs->os_bfree;
1579                 osfs->os_bavail += lov_sfs->os_bavail;
1580 #endif
1581                 osfs->os_blocks += lov_sfs->os_blocks;
1582                 /* XXX not sure about this one - depends on policy.
1583                  *   - could be minimum if we always stripe on all OBDs
1584                  *     (but that would be wrong for any other policy,
1585                  *     if one of the OBDs has no more objects left)
1586                  *   - could be sum if we stripe whole objects
1587                  *   - could be average, just to give a nice number
1588                  *
1589                  * To give a "reasonable" (if not wholly accurate)
1590                  * number, we divide the total number of free objects
1591                  * by expected stripe count (watch out for overflow).
1592                  */
1593                 LOV_SUM_MAX(osfs->os_files, lov_sfs->os_files);
1594                 LOV_SUM_MAX(osfs->os_ffree, lov_sfs->os_ffree);
1595         }
1596 }
1597
1598 /* The callback for osc_statfs_async that finilizes a request info when a
1599  * response is received. */
1600 static int cb_statfs_update(struct obd_info *oinfo, int rc)
1601 {
1602         struct lov_request *lovreq;
1603         struct obd_statfs *osfs, *lov_sfs;
1604         struct obd_device *obd;
1605         struct lov_obd *lov;
1606         int success;
1607         ENTRY;
1608
1609         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1610         lov = &lovreq->rq_rqset->set_obd->u.lov;
1611         obd = class_exp2obd(lov->lov_tgts[lovreq->rq_idx]->ltd_exp);
1612
1613         osfs = lovreq->rq_rqset->set_oi->oi_osfs;
1614         lov_sfs = oinfo->oi_osfs;
1615
1616         success = lovreq->rq_rqset->set_success;
1617         /* XXX: the same is done in lov_update_common_set, however
1618            lovset->set_exp is not initialized. */
1619         lov_update_set(lovreq->rq_rqset, lovreq, rc);
1620         if (rc) {
1621                 /* XXX ignore error for disconnected ost ? */
1622                 if (rc && !(lov->lov_tgts[lovreq->rq_idx] &&
1623                             lov->lov_tgts[lovreq->rq_idx]->ltd_active))
1624                         rc = 0;
1625                 GOTO(out, rc);
1626         }
1627
1628         spin_lock(&obd->obd_osfs_lock);
1629         memcpy(&obd->obd_osfs, lov_sfs, sizeof(*lov_sfs));
1630         if ((oinfo->oi_flags & OBD_STATFS_FROM_CACHE) == 0)
1631                 obd->obd_osfs_age = cfs_time_current_64();
1632         spin_unlock(&obd->obd_osfs_lock);
1633
1634         lov_update_statfs(osfs, lov_sfs, success);
1635         qos_update(lov);
1636 out:
1637         if (lovreq->rq_rqset->set_oi->oi_flags & OBD_STATFS_PTLRPCD &&
1638             lov_finished_set(lovreq->rq_rqset)) {
1639                lov_statfs_interpret(NULL, lovreq->rq_rqset,
1640                                     lovreq->rq_rqset->set_success !=
1641                                                   lovreq->rq_rqset->set_count);
1642                qos_statfs_done(lov);
1643         }
1644
1645         RETURN(0);
1646 }
1647
1648 int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo,
1649                         struct lov_request_set **reqset)
1650 {
1651         struct lov_request_set *set;
1652         struct lov_obd *lov = &obd->u.lov;
1653         int rc = 0, i;
1654         ENTRY;
1655
1656         OBD_ALLOC(set, sizeof(*set));
1657         if (set == NULL)
1658                 RETURN(-ENOMEM);
1659         lov_init_set(set);
1660
1661         set->set_obd = obd;
1662         set->set_oi = oinfo;
1663
1664         /* We only get block data from the OBD */
1665         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1666                 struct lov_request *req;
1667
1668                 if (!lov->lov_tgts[i] || (!lov->lov_tgts[i]->ltd_active
1669                                           && (oinfo->oi_flags & OBD_STATFS_NODELAY))) {
1670                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
1671                         continue;
1672                 }
1673
1674                 /* skip targets that have been explicitely disabled by the
1675                  * administrator */
1676                 if (!lov->lov_tgts[i]->ltd_exp) {
1677                         CDEBUG(D_HA, "lov idx %d administratively disabled\n", i);
1678                         continue;
1679                 }
1680
1681                 OBD_ALLOC(req, sizeof(*req));
1682                 if (req == NULL)
1683                         GOTO(out_set, rc = -ENOMEM);
1684
1685                 OBD_ALLOC(req->rq_oi.oi_osfs, sizeof(*req->rq_oi.oi_osfs));
1686                 if (req->rq_oi.oi_osfs == NULL) {
1687                         OBD_FREE(req, sizeof(*req));
1688                         GOTO(out_set, rc = -ENOMEM);
1689                 }
1690
1691                 req->rq_idx = i;
1692                 req->rq_oi.oi_cb_up = cb_statfs_update;
1693                 req->rq_oi.oi_flags = oinfo->oi_flags;
1694
1695                 lov_set_add_req(req, set);
1696         }
1697         if (!set->set_count)
1698                 GOTO(out_set, rc = -EIO);
1699         *reqset = set;
1700         RETURN(rc);
1701 out_set:
1702         lov_fini_statfs_set(set);
1703         RETURN(rc);
1704 }