Whamcloud - gitweb
b=17682 limit performance impact of rpctrace, dlmtrace & quota
[fs/lustre-release.git] / lustre / lov / lov_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_LOV
41
42 #ifdef __KERNEL__
43 #include <libcfs/libcfs.h>
44 #else
45 #include <liblustre.h>
46 #endif
47
48 #include <obd_class.h>
49 #include <obd_lov.h>
50 #include <lustre/lustre_idl.h>
51
52 #include "lov_internal.h"
53
54 static void lov_init_set(struct lov_request_set *set)
55 {
56         set->set_count = 0;
57         set->set_completes = 0;
58         set->set_success = 0;
59         set->set_cookies = 0;
60         CFS_INIT_LIST_HEAD(&set->set_list);
61         atomic_set(&set->set_refcount, 1);
62         cfs_waitq_init(&set->set_waitq);
63         spin_lock_init(&set->set_lock);
64 }
65
66 static void lov_finish_set(struct lov_request_set *set)
67 {
68         struct list_head *pos, *n;
69         ENTRY;
70
71         LASSERT(set);
72         list_for_each_safe(pos, n, &set->set_list) {
73                 struct lov_request *req = list_entry(pos, struct lov_request,
74                                                      rq_link);
75                 list_del_init(&req->rq_link);
76
77                 if (req->rq_oi.oi_oa)
78                         OBDO_FREE(req->rq_oi.oi_oa);
79                 if (req->rq_oi.oi_md)
80                         OBD_FREE(req->rq_oi.oi_md, req->rq_buflen);
81                 if (req->rq_oi.oi_osfs)
82                         OBD_FREE(req->rq_oi.oi_osfs,
83                                  sizeof(*req->rq_oi.oi_osfs));
84                 OBD_FREE(req, sizeof(*req));
85         }
86
87         if (set->set_pga) {
88                 int len = set->set_oabufs * sizeof(*set->set_pga);
89                 OBD_FREE(set->set_pga, len);
90         }
91         if (set->set_lockh)
92                 lov_llh_put(set->set_lockh);
93
94         OBD_FREE(set, sizeof(*set));
95         EXIT;
96 }
97
98 int lov_finished_set(struct lov_request_set *set)
99 {
100         CDEBUG(D_INFO, "check set %d/%d\n", set->set_completes,
101                set->set_count);
102         return set->set_completes == set->set_count;
103 }
104
105
106 void lov_update_set(struct lov_request_set *set,
107                     struct lov_request *req, int rc)
108 {
109         req->rq_complete = 1;
110         req->rq_rc = rc;
111
112         set->set_completes++;
113         if (rc == 0)
114                 set->set_success++;
115
116         cfs_waitq_signal(&set->set_waitq);
117 }
118
119 int lov_update_common_set(struct lov_request_set *set,
120                           struct lov_request *req, int rc)
121 {
122         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
123         ENTRY;
124
125         lov_update_set(set, req, rc);
126
127         /* grace error on inactive ost */
128         if (rc && !(lov->lov_tgts[req->rq_idx] &&
129                     lov->lov_tgts[req->rq_idx]->ltd_active))
130                 rc = 0;
131
132         /* FIXME in raid1 regime, should return 0 */
133         RETURN(rc);
134 }
135
136 void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
137 {
138         list_add_tail(&req->rq_link, &set->set_list);
139         set->set_count++;
140         req->rq_rqset = set;
141 }
142
143 extern void osc_update_enqueue(struct lustre_handle *lov_lockhp,
144                                struct lov_oinfo *loi, int flags,
145                                struct ost_lvb *lvb, __u32 mode, int rc);
146
147 static int lov_update_enqueue_lov(struct obd_export *exp,
148                                   struct lustre_handle *lov_lockhp,
149                                   struct lov_oinfo *loi, int flags, int idx,
150                                   __u64 oid, int rc)
151 {
152         struct lov_obd *lov = &exp->exp_obd->u.lov;
153
154         if (rc != ELDLM_OK &&
155             !(rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT))) {
156                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
157                 if (lov->lov_tgts[idx] && lov->lov_tgts[idx]->ltd_active) {
158                         /* -EUSERS used by OST to report file contention */
159                         if (rc != -EINTR && rc != -EUSERS)
160                                 CERROR("enqueue objid "LPX64" subobj "
161                                        LPX64" on OST idx %d: rc %d\n",
162                                        oid, loi->loi_id, loi->loi_ost_idx, rc);
163                 } else
164                         rc = ELDLM_OK;
165         }
166         return rc;
167 }
168
169 int lov_update_enqueue_set(struct lov_request *req, __u32 mode, int rc)
170 {
171         struct lov_request_set *set = req->rq_rqset;
172         struct lustre_handle *lov_lockhp;
173         struct obd_info *oi = set->set_oi;
174         struct lov_oinfo *loi;
175         ENTRY;
176
177         LASSERT(oi != NULL);
178
179         lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
180         loi = oi->oi_md->lsm_oinfo[req->rq_stripe];
181
182         /* XXX LOV STACKING: OSC gets a copy, created in lov_prep_enqueue_set
183          * and that copy can be arbitrarily out of date.
184          *
185          * The LOV API is due for a serious rewriting anyways, and this
186          * can be addressed then. */
187
188         lov_stripe_lock(oi->oi_md);
189         osc_update_enqueue(lov_lockhp, loi, oi->oi_flags,
190                            &req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb, mode, rc);
191         if (rc == ELDLM_LOCK_ABORTED && (oi->oi_flags & LDLM_FL_HAS_INTENT))
192                 memset(lov_lockhp, 0, sizeof *lov_lockhp);
193         rc = lov_update_enqueue_lov(set->set_exp, lov_lockhp, loi, oi->oi_flags,
194                                     req->rq_idx, oi->oi_md->lsm_object_id, rc);
195         lov_stripe_unlock(oi->oi_md);
196         lov_update_set(set, req, rc);
197         RETURN(rc);
198 }
199
200 /* The callback for osc_enqueue that updates lov info for every OSC request. */
201 static int cb_update_enqueue(void *cookie, int rc)
202 {
203         struct obd_info *oinfo = cookie;
204         struct ldlm_enqueue_info *einfo;
205         struct lov_request *lovreq;
206
207         lovreq = container_of(oinfo, struct lov_request, rq_oi);
208         einfo = lovreq->rq_rqset->set_ei;
209         return lov_update_enqueue_set(lovreq, einfo->ei_mode, rc);
210 }
211
212 static int enqueue_done(struct lov_request_set *set, __u32 mode)
213 {
214         struct lov_request *req;
215         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
216         int rc = 0;
217         ENTRY;
218
219         /* enqueue/match success, just return */
220         if (set->set_completes && set->set_completes == set->set_success)
221                 RETURN(0);
222
223         /* cancel enqueued/matched locks */
224         list_for_each_entry(req, &set->set_list, rq_link) {
225                 struct lustre_handle *lov_lockhp;
226
227                 if (!req->rq_complete || req->rq_rc)
228                         continue;
229
230                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
231                 LASSERT(lov_lockhp);
232                 if (!lustre_handle_is_used(lov_lockhp))
233                         continue;
234
235                 rc = obd_cancel(lov->lov_tgts[req->rq_idx]->ltd_exp,
236                                 req->rq_oi.oi_md, mode, lov_lockhp);
237                 if (rc && lov->lov_tgts[req->rq_idx] &&
238                     lov->lov_tgts[req->rq_idx]->ltd_active)
239                         CERROR("cancelling obdjid "LPX64" on OST "
240                                "idx %d error: rc = %d\n",
241                                req->rq_oi.oi_md->lsm_object_id,
242                                req->rq_idx, rc);
243         }
244         if (set->set_lockh)
245                 lov_llh_put(set->set_lockh);
246         RETURN(rc);
247 }
248
249 int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc,
250                          struct ptlrpc_request_set *rqset)
251 {
252         int ret = 0;
253         ENTRY;
254
255         if (set == NULL)
256                 RETURN(0);
257         LASSERT(set->set_exp);
258         /* Do enqueue_done only for sync requests and if any request
259          * succeeded. */
260         if (!rqset) {
261                 if (rc)
262                         set->set_completes = 0;
263                 ret = enqueue_done(set, mode);
264         } else if (set->set_lockh)
265                 lov_llh_put(set->set_lockh);
266
267         if (atomic_dec_and_test(&set->set_refcount))
268                 lov_finish_set(set);
269
270         RETURN(rc ? rc : ret);
271 }
272
273 int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo,
274                          struct ldlm_enqueue_info *einfo,
275                          struct lov_request_set **reqset)
276 {
277         struct lov_obd *lov = &exp->exp_obd->u.lov;
278         struct lov_request_set *set;
279         int i, rc = 0;
280         ENTRY;
281
282         OBD_ALLOC(set, sizeof(*set));
283         if (set == NULL)
284                 RETURN(-ENOMEM);
285         lov_init_set(set);
286
287         set->set_exp = exp;
288         set->set_oi = oinfo;
289         set->set_ei = einfo;
290         set->set_lockh = lov_llh_new(oinfo->oi_md);
291         if (set->set_lockh == NULL)
292                 GOTO(out_set, rc = -ENOMEM);
293         oinfo->oi_lockh->cookie = set->set_lockh->llh_handle.h_cookie;
294
295         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
296                 struct lov_oinfo *loi;
297                 struct lov_request *req;
298                 obd_off start, end;
299
300                 loi = oinfo->oi_md->lsm_oinfo[i];
301                 if (!lov_stripe_intersects(oinfo->oi_md, i,
302                                            oinfo->oi_policy.l_extent.start,
303                                            oinfo->oi_policy.l_extent.end,
304                                            &start, &end))
305                         continue;
306
307                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
308                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
309                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
310                         continue;
311                 }
312
313                 OBD_ALLOC(req, sizeof(*req));
314                 if (req == NULL)
315                         GOTO(out_set, rc = -ENOMEM);
316
317                 req->rq_buflen = sizeof(*req->rq_oi.oi_md) +
318                         sizeof(struct lov_oinfo *) +
319                         sizeof(struct lov_oinfo);
320                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
321                 if (req->rq_oi.oi_md == NULL) {
322                         OBD_FREE(req, sizeof(*req));
323                         GOTO(out_set, rc = -ENOMEM);
324                 }
325                 req->rq_oi.oi_md->lsm_oinfo[0] =
326                         ((void *)req->rq_oi.oi_md) + sizeof(*req->rq_oi.oi_md) +
327                         sizeof(struct lov_oinfo *);
328
329                 /* Set lov request specific parameters. */
330                 req->rq_oi.oi_lockh = set->set_lockh->llh_handles + i;
331                 req->rq_oi.oi_cb_up = cb_update_enqueue;
332                 req->rq_oi.oi_flags = oinfo->oi_flags;
333
334                 LASSERT(req->rq_oi.oi_lockh);
335
336                 req->rq_oi.oi_policy.l_extent.gid =
337                         oinfo->oi_policy.l_extent.gid;
338                 req->rq_oi.oi_policy.l_extent.start = start;
339                 req->rq_oi.oi_policy.l_extent.end = end;
340
341                 req->rq_idx = loi->loi_ost_idx;
342                 req->rq_stripe = i;
343
344                 /* XXX LOV STACKING: submd should be from the subobj */
345                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
346                 req->rq_oi.oi_md->lsm_object_gr = oinfo->oi_md->lsm_object_gr;
347                 req->rq_oi.oi_md->lsm_stripe_count = 0;
348                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms_valid =
349                         loi->loi_kms_valid;
350                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms = loi->loi_kms;
351                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb = loi->loi_lvb;
352
353                 lov_set_add_req(req, set);
354         }
355         if (!set->set_count)
356                 GOTO(out_set, rc = -EIO);
357         *reqset = set;
358         RETURN(0);
359 out_set:
360         lov_fini_enqueue_set(set, einfo->ei_mode, rc, NULL);
361         RETURN(rc);
362 }
363
364 int lov_update_match_set(struct lov_request_set *set, struct lov_request *req,
365                          int rc)
366 {
367         int ret = rc;
368         ENTRY;
369
370         if (rc > 0)
371                 ret = 0;
372         else if (rc == 0)
373                 ret = 1;
374         lov_update_set(set, req, ret);
375         RETURN(rc);
376 }
377
378 int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
379 {
380         int rc = 0;
381         ENTRY;
382
383         if (set == NULL)
384                 RETURN(0);
385         LASSERT(set->set_exp);
386         rc = enqueue_done(set, mode);
387         if ((set->set_count == set->set_success) &&
388             (flags & LDLM_FL_TEST_LOCK))
389                 lov_llh_put(set->set_lockh);
390
391         if (atomic_dec_and_test(&set->set_refcount))
392                 lov_finish_set(set);
393
394         RETURN(rc);
395 }
396
397 int lov_prep_match_set(struct obd_export *exp, struct obd_info *oinfo,
398                        struct lov_stripe_md *lsm, ldlm_policy_data_t *policy,
399                        __u32 mode, struct lustre_handle *lockh,
400                        struct lov_request_set **reqset)
401 {
402         struct lov_obd *lov = &exp->exp_obd->u.lov;
403         struct lov_request_set *set;
404         int i, rc = 0;
405         ENTRY;
406
407         OBD_ALLOC(set, sizeof(*set));
408         if (set == NULL)
409                 RETURN(-ENOMEM);
410         lov_init_set(set);
411
412         set->set_exp = exp;
413         set->set_oi = oinfo;
414         set->set_oi->oi_md = lsm;
415         set->set_lockh = lov_llh_new(lsm);
416         if (set->set_lockh == NULL)
417                 GOTO(out_set, rc = -ENOMEM);
418         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
419
420         for (i = 0; i < lsm->lsm_stripe_count; i++){
421                 struct lov_oinfo *loi;
422                 struct lov_request *req;
423                 obd_off start, end;
424
425                 loi = lsm->lsm_oinfo[i];
426                 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
427                                            policy->l_extent.end, &start, &end))
428                         continue;
429
430                 /* FIXME raid1 should grace this error */
431                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
432                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
433                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
434                         GOTO(out_set, rc = -EIO);
435                 }
436
437                 OBD_ALLOC(req, sizeof(*req));
438                 if (req == NULL)
439                         GOTO(out_set, rc = -ENOMEM);
440
441                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
442                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
443                 if (req->rq_oi.oi_md == NULL) {
444                         OBD_FREE(req, sizeof(*req));
445                         GOTO(out_set, rc = -ENOMEM);
446                 }
447
448                 req->rq_oi.oi_policy.l_extent.start = start;
449                 req->rq_oi.oi_policy.l_extent.end = end;
450                 req->rq_oi.oi_policy.l_extent.gid = policy->l_extent.gid;
451
452                 req->rq_idx = loi->loi_ost_idx;
453                 req->rq_stripe = i;
454
455                 /* XXX LOV STACKING: submd should be from the subobj */
456                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
457                 req->rq_oi.oi_md->lsm_object_gr = lsm->lsm_object_gr;
458                 req->rq_oi.oi_md->lsm_stripe_count = 0;
459
460                 lov_set_add_req(req, set);
461         }
462         if (!set->set_count)
463                 GOTO(out_set, rc = -EIO);
464         *reqset = set;
465         RETURN(rc);
466 out_set:
467         lov_fini_match_set(set, mode, 0);
468         RETURN(rc);
469 }
470
471 int lov_fini_cancel_set(struct lov_request_set *set)
472 {
473         int rc = 0;
474         ENTRY;
475
476         if (set == NULL)
477                 RETURN(0);
478
479         LASSERT(set->set_exp);
480         if (set->set_lockh)
481                 lov_llh_put(set->set_lockh);
482
483         if (atomic_dec_and_test(&set->set_refcount))
484                 lov_finish_set(set);
485
486         RETURN(rc);
487 }
488
489 int lov_prep_cancel_set(struct obd_export *exp, struct obd_info *oinfo,
490                         struct lov_stripe_md *lsm, __u32 mode,
491                         struct lustre_handle *lockh,
492                         struct lov_request_set **reqset)
493 {
494         struct lov_request_set *set;
495         int i, rc = 0;
496         ENTRY;
497
498         OBD_ALLOC(set, sizeof(*set));
499         if (set == NULL)
500                 RETURN(-ENOMEM);
501         lov_init_set(set);
502
503         set->set_exp = exp;
504         set->set_oi = oinfo;
505         set->set_oi->oi_md = lsm;
506         set->set_lockh = lov_handle2llh(lockh);
507         if (set->set_lockh == NULL) {
508                 CERROR("LOV: invalid lov lock handle %p\n", lockh);
509                 GOTO(out_set, rc = -EINVAL);
510         }
511         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
512
513         for (i = 0; i < lsm->lsm_stripe_count; i++){
514                 struct lov_request *req;
515                 struct lustre_handle *lov_lockhp;
516                 struct lov_oinfo *loi = lsm->lsm_oinfo[i];
517
518                 lov_lockhp = set->set_lockh->llh_handles + i;
519                 if (!lustre_handle_is_used(lov_lockhp)) {
520                         CDEBUG(D_INFO, "lov idx %d subobj "LPX64" no lock\n",
521                                loi->loi_ost_idx, loi->loi_id);
522                         continue;
523                 }
524
525                 OBD_ALLOC(req, sizeof(*req));
526                 if (req == NULL)
527                         GOTO(out_set, rc = -ENOMEM);
528
529                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
530                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
531                 if (req->rq_oi.oi_md == NULL) {
532                         OBD_FREE(req, sizeof(*req));
533                         GOTO(out_set, rc = -ENOMEM);
534                 }
535
536                 req->rq_idx = loi->loi_ost_idx;
537                 req->rq_stripe = i;
538
539                 /* XXX LOV STACKING: submd should be from the subobj */
540                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
541                 req->rq_oi.oi_md->lsm_object_gr = lsm->lsm_object_gr;
542                 req->rq_oi.oi_md->lsm_stripe_count = 0;
543
544                 lov_set_add_req(req, set);
545         }
546         if (!set->set_count)
547                 GOTO(out_set, rc = -EIO);
548         *reqset = set;
549         RETURN(rc);
550 out_set:
551         lov_fini_cancel_set(set);
552         RETURN(rc);
553 }
554
555 static int create_done(struct obd_export *exp, struct lov_request_set *set,
556                        struct lov_stripe_md **lsmp)
557 {
558         struct lov_obd *lov = &exp->exp_obd->u.lov;
559         struct obd_trans_info *oti = set->set_oti;
560         struct obdo *src_oa = set->set_oi->oi_oa;
561         struct lov_request *req;
562         struct obdo *ret_oa = NULL;
563         int attrset = 0, rc = 0;
564         ENTRY;
565
566         LASSERT(set->set_completes);
567
568         /* try alloc objects on other osts if osc_create fails for
569          * exceptions: RPC failure, ENOSPC, etc */
570         if (set->set_count != set->set_success) {
571                 list_for_each_entry (req, &set->set_list, rq_link) {
572                         if (req->rq_rc == 0)
573                                 continue;
574
575                         set->set_completes--;
576                         req->rq_complete = 0;
577
578                         rc = qos_remedy_create(set, req);
579                         lov_update_create_set(set, req, rc);
580                 }
581         }
582
583         /* no successful creates */
584         if (set->set_success == 0)
585                 GOTO(cleanup, rc);
586
587         if (set->set_count != set->set_success) {
588                 set->set_count = set->set_success;
589                 qos_shrink_lsm(set);
590         }
591
592         OBDO_ALLOC(ret_oa);
593         if (ret_oa == NULL)
594                 GOTO(cleanup, rc = -ENOMEM);
595
596         list_for_each_entry(req, &set->set_list, rq_link) {
597                 if (!req->rq_complete || req->rq_rc)
598                         continue;
599                 lov_merge_attrs(ret_oa, req->rq_oi.oi_oa,
600                                 req->rq_oi.oi_oa->o_valid, set->set_oi->oi_md,
601                                 req->rq_stripe, &attrset);
602         }
603         if (src_oa->o_valid & OBD_MD_FLSIZE &&
604             ret_oa->o_size != src_oa->o_size) {
605                 CERROR("original size "LPU64" isn't new object size "LPU64"\n",
606                        src_oa->o_size, ret_oa->o_size);
607                 LBUG();
608         }
609         ret_oa->o_id = src_oa->o_id;
610         ret_oa->o_gr = src_oa->o_gr;
611         ret_oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP;
612         memcpy(src_oa, ret_oa, sizeof(*src_oa));
613         OBDO_FREE(ret_oa);
614
615         *lsmp = set->set_oi->oi_md;
616         GOTO(done, rc = 0);
617
618 cleanup:
619         list_for_each_entry(req, &set->set_list, rq_link) {
620                 struct obd_export *sub_exp;
621                 int err = 0;
622
623                 if (!req->rq_complete || req->rq_rc)
624                         continue;
625
626                 sub_exp = lov->lov_tgts[req->rq_idx]->ltd_exp;
627                 err = obd_destroy(sub_exp, req->rq_oi.oi_oa, NULL, oti, NULL,
628                                   NULL);
629                 if (err)
630                         CERROR("Failed to uncreate objid "LPX64" subobj "
631                                LPX64" on OST idx %d: rc = %d\n",
632                                src_oa->o_id, req->rq_oi.oi_oa->o_id,
633                                req->rq_idx, rc);
634         }
635         if (*lsmp == NULL)
636                 obd_free_memmd(exp, &set->set_oi->oi_md);
637 done:
638         if (oti && set->set_cookies) {
639                 oti->oti_logcookies = set->set_cookies;
640                 if (!set->set_cookie_sent) {
641                         oti_free_cookies(oti);
642                         src_oa->o_valid &= ~OBD_MD_FLCOOKIE;
643                 } else {
644                         src_oa->o_valid |= OBD_MD_FLCOOKIE;
645                 }
646         }
647         RETURN(rc);
648 }
649
650 int lov_fini_create_set(struct lov_request_set *set,struct lov_stripe_md **lsmp)
651 {
652         int rc = 0;
653         ENTRY;
654
655         if (set == NULL)
656                 RETURN(0);
657         LASSERT(set->set_exp);
658         if (set->set_completes)
659                 rc = create_done(set->set_exp, set, lsmp);
660
661         if (atomic_dec_and_test(&set->set_refcount))
662                 lov_finish_set(set);
663
664         RETURN(rc);
665 }
666
667 int lov_update_create_set(struct lov_request_set *set,
668                           struct lov_request *req, int rc)
669 {
670         struct obd_trans_info *oti = set->set_oti;
671         struct lov_stripe_md *lsm = set->set_oi->oi_md;
672         struct lov_oinfo *loi;
673         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
674         ENTRY;
675
676         if (rc && lov->lov_tgts[req->rq_idx] &&
677             lov->lov_tgts[req->rq_idx]->ltd_active) {
678                 CERROR("error creating fid "LPX64" sub-object"
679                        " on OST idx %d/%d: rc = %d\n",
680                        set->set_oi->oi_oa->o_id, req->rq_idx,
681                        lsm->lsm_stripe_count, rc);
682                 if (rc > 0) {
683                         CERROR("obd_create returned invalid err %d\n", rc);
684                         rc = -EIO;
685                 }
686         }
687
688         spin_lock(&set->set_lock);
689         req->rq_stripe = set->set_success;
690         loi = lsm->lsm_oinfo[req->rq_stripe];
691         if (rc) {
692                 lov_update_set(set, req, rc);
693                 spin_unlock(&set->set_lock);
694                 RETURN(rc);
695         }
696
697         loi->loi_id = req->rq_oi.oi_oa->o_id;
698         loi->loi_gr = req->rq_oi.oi_oa->o_gr;
699         loi->loi_ost_idx = req->rq_idx;
700         loi_init(loi);
701
702         if (oti && set->set_cookies)
703                 ++oti->oti_logcookies;
704         if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCOOKIE)
705                 set->set_cookie_sent++;
706
707         lov_update_set(set, req, rc);
708         spin_unlock(&set->set_lock);
709
710         CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPU64" at idx %d\n",
711                lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);
712         RETURN(rc);
713 }
714
715 int cb_create_update(void *cookie, int rc)
716 {
717         struct obd_info *oinfo = cookie;
718         struct lov_request *lovreq;
719
720         lovreq = container_of(oinfo, struct lov_request, rq_oi);
721         return lov_update_create_set(lovreq->rq_rqset, lovreq, rc);
722 }
723
724
725 int lov_prep_create_set(struct obd_export *exp, struct obd_info *oinfo,
726                         struct lov_stripe_md **lsmp, struct obdo *src_oa,
727                         struct obd_trans_info *oti,
728                         struct lov_request_set **reqset)
729 {
730         struct lov_request_set *set;
731         int rc = 0;
732         ENTRY;
733
734         OBD_ALLOC(set, sizeof(*set));
735         if (set == NULL)
736                 RETURN(-ENOMEM);
737         lov_init_set(set);
738
739         set->set_exp = exp;
740         set->set_oi = oinfo;
741         set->set_oi->oi_md = *lsmp;
742         set->set_oi->oi_oa = src_oa;
743         set->set_oti = oti;
744
745         rc = qos_prep_create(exp, set);
746         /* qos_shrink_lsm() may have allocated a new lsm */
747         *lsmp = oinfo->oi_md;
748         if (rc)
749                 lov_fini_create_set(set, lsmp);
750         else
751                 *reqset = set;
752         RETURN(rc);
753 }
754
755 static int common_attr_done(struct lov_request_set *set)
756 {
757         struct list_head *pos;
758         struct lov_request *req;
759         struct obdo *tmp_oa;
760         int rc = 0, attrset = 0;
761         ENTRY;
762
763         LASSERT(set->set_oi != NULL);
764
765         if (set->set_oi->oi_oa == NULL)
766                 RETURN(0);
767
768         if (!set->set_success)
769                 RETURN(-EIO);
770
771         OBDO_ALLOC(tmp_oa);
772         if (tmp_oa == NULL)
773                 GOTO(out, rc = -ENOMEM);
774
775         list_for_each (pos, &set->set_list) {
776                 req = list_entry(pos, struct lov_request, rq_link);
777
778                 if (!req->rq_complete || req->rq_rc)
779                         continue;
780                 if (req->rq_oi.oi_oa->o_valid == 0)   /* inactive stripe */
781                         continue;
782                 lov_merge_attrs(tmp_oa, req->rq_oi.oi_oa,
783                                 req->rq_oi.oi_oa->o_valid,
784                                 set->set_oi->oi_md, req->rq_stripe, &attrset);
785         }
786         if (!attrset) {
787                 CERROR("No stripes had valid attrs\n");
788                 rc = -EIO;
789         }
790         tmp_oa->o_id = set->set_oi->oi_oa->o_id;
791         memcpy(set->set_oi->oi_oa, tmp_oa, sizeof(*set->set_oi->oi_oa));
792 out:
793         if (tmp_oa)
794                 OBDO_FREE(tmp_oa);
795         RETURN(rc);
796
797 }
798
799 static int brw_done(struct lov_request_set *set)
800 {
801         struct lov_stripe_md *lsm = set->set_oi->oi_md;
802         struct lov_oinfo     *loi = NULL;
803         struct list_head *pos;
804         struct lov_request *req;
805         ENTRY;
806
807         list_for_each (pos, &set->set_list) {
808                 req = list_entry(pos, struct lov_request, rq_link);
809
810                 if (!req->rq_complete || req->rq_rc)
811                         continue;
812
813                 loi = lsm->lsm_oinfo[req->rq_stripe];
814
815                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS)
816                         loi->loi_lvb.lvb_blocks = req->rq_oi.oi_oa->o_blocks;
817         }
818
819         RETURN(0);
820 }
821
822 int lov_fini_brw_set(struct lov_request_set *set)
823 {
824         int rc = 0;
825         ENTRY;
826
827         if (set == NULL)
828                 RETURN(0);
829         LASSERT(set->set_exp);
830         if (set->set_completes) {
831                 rc = brw_done(set);
832                 /* FIXME update qos data here */
833         }
834         if (atomic_dec_and_test(&set->set_refcount))
835                 lov_finish_set(set);
836
837         RETURN(rc);
838 }
839
840 int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo,
841                      obd_count oa_bufs, struct brw_page *pga,
842                      struct obd_trans_info *oti,
843                      struct lov_request_set **reqset)
844 {
845         struct {
846                 obd_count       index;
847                 obd_count       count;
848                 obd_count       off;
849         } *info = NULL;
850         struct lov_request_set *set;
851         struct lov_obd *lov = &exp->exp_obd->u.lov;
852         int rc = 0, i, shift;
853         ENTRY;
854
855         OBD_ALLOC(set, sizeof(*set));
856         if (set == NULL)
857                 RETURN(-ENOMEM);
858         lov_init_set(set);
859
860         set->set_exp = exp;
861         set->set_oti = oti;
862         set->set_oi = oinfo;
863         set->set_oabufs = oa_bufs;
864         OBD_ALLOC(set->set_pga, oa_bufs * sizeof(*set->set_pga));
865         if (!set->set_pga)
866                 GOTO(out, rc = -ENOMEM);
867
868         OBD_ALLOC(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
869         if (!info)
870                 GOTO(out, rc = -ENOMEM);
871
872         /* calculate the page count for each stripe */
873         for (i = 0; i < oa_bufs; i++) {
874                 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
875                 info[stripe].count++;
876         }
877
878         /* alloc and initialize lov request */
879         shift = 0;
880         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++){
881                 struct lov_oinfo *loi = NULL;
882                 struct lov_request *req;
883
884                 if (info[i].count == 0)
885                         continue;
886
887                 loi = oinfo->oi_md->lsm_oinfo[i];
888                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
889                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
890                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
891                         GOTO(out, rc = -EIO);
892                 }
893
894                 OBD_ALLOC(req, sizeof(*req));
895                 if (req == NULL)
896                         GOTO(out, rc = -ENOMEM);
897
898                 OBDO_ALLOC(req->rq_oi.oi_oa);
899                 if (req->rq_oi.oi_oa == NULL) {
900                         OBD_FREE(req, sizeof(*req));
901                         GOTO(out, rc = -ENOMEM);
902                 }
903
904                 if (oinfo->oi_oa) {
905                         memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
906                                sizeof(*req->rq_oi.oi_oa));
907                 }
908                 req->rq_oi.oi_oa->o_id = loi->loi_id;
909                 req->rq_oi.oi_oa->o_stripe_idx = i;
910
911                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
912                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
913                 if (req->rq_oi.oi_md == NULL) {
914                         OBDO_FREE(req->rq_oi.oi_oa);
915                         OBD_FREE(req, sizeof(*req));
916                         GOTO(out, rc = -ENOMEM);
917                 }
918
919                 req->rq_idx = loi->loi_ost_idx;
920                 req->rq_stripe = i;
921
922                 /* XXX LOV STACKING */
923                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
924                 req->rq_oi.oi_md->lsm_object_gr = oinfo->oi_md->lsm_object_gr;
925                 req->rq_oabufs = info[i].count;
926                 req->rq_pgaidx = shift;
927                 shift += req->rq_oabufs;
928
929                 /* remember the index for sort brw_page array */
930                 info[i].index = req->rq_pgaidx;
931
932                 req->rq_oi.oi_capa = oinfo->oi_capa;
933
934                 lov_set_add_req(req, set);
935         }
936         if (!set->set_count)
937                 GOTO(out, rc = -EIO);
938
939         /* rotate & sort the brw_page array */
940         for (i = 0; i < oa_bufs; i++) {
941                 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
942
943                 shift = info[stripe].index + info[stripe].off;
944                 LASSERT(shift < oa_bufs);
945                 set->set_pga[shift] = pga[i];
946                 lov_stripe_offset(oinfo->oi_md, pga[i].off, stripe,
947                                   &set->set_pga[shift].off);
948                 info[stripe].off++;
949         }
950 out:
951         if (info)
952                 OBD_FREE(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
953
954         if (rc == 0)
955                 *reqset = set;
956         else
957                 lov_fini_brw_set(set);
958
959         RETURN(rc);
960 }
961
962 int lov_fini_getattr_set(struct lov_request_set *set)
963 {
964         int rc = 0;
965         ENTRY;
966
967         if (set == NULL)
968                 RETURN(0);
969         LASSERT(set->set_exp);
970         if (set->set_completes)
971                 rc = common_attr_done(set);
972
973         if (atomic_dec_and_test(&set->set_refcount))
974                 lov_finish_set(set);
975
976         RETURN(rc);
977 }
978
979 /* The callback for osc_getattr_async that finilizes a request info when a
980  * response is received. */
981 static int cb_getattr_update(void *cookie, int rc)
982 {
983         struct obd_info *oinfo = cookie;
984         struct lov_request *lovreq;
985         lovreq = container_of(oinfo, struct lov_request, rq_oi);
986         return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
987 }
988
989 int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo,
990                          struct lov_request_set **reqset)
991 {
992         struct lov_request_set *set;
993         struct lov_obd *lov = &exp->exp_obd->u.lov;
994         int rc = 0, i;
995         ENTRY;
996
997         OBD_ALLOC(set, sizeof(*set));
998         if (set == NULL)
999                 RETURN(-ENOMEM);
1000         lov_init_set(set);
1001
1002         set->set_exp = exp;
1003         set->set_oi = oinfo;
1004
1005         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1006                 struct lov_oinfo *loi;
1007                 struct lov_request *req;
1008
1009                 loi = oinfo->oi_md->lsm_oinfo[i];
1010                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1011                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1012                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1013                         continue;
1014                 }
1015
1016                 OBD_ALLOC(req, sizeof(*req));
1017                 if (req == NULL)
1018                         GOTO(out_set, rc = -ENOMEM);
1019
1020                 req->rq_stripe = i;
1021                 req->rq_idx = loi->loi_ost_idx;
1022
1023                 OBDO_ALLOC(req->rq_oi.oi_oa);
1024                 if (req->rq_oi.oi_oa == NULL) {
1025                         OBD_FREE(req, sizeof(*req));
1026                         GOTO(out_set, rc = -ENOMEM);
1027                 }
1028                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1029                        sizeof(*req->rq_oi.oi_oa));
1030                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1031                 req->rq_oi.oi_cb_up = cb_getattr_update;
1032                 req->rq_oi.oi_capa = oinfo->oi_capa;
1033
1034                 lov_set_add_req(req, set);
1035         }
1036         if (!set->set_count)
1037                 GOTO(out_set, rc = -EIO);
1038         *reqset = set;
1039         RETURN(rc);
1040 out_set:
1041         lov_fini_getattr_set(set);
1042         RETURN(rc);
1043 }
1044
1045 int lov_fini_destroy_set(struct lov_request_set *set)
1046 {
1047         ENTRY;
1048
1049         if (set == NULL)
1050                 RETURN(0);
1051         LASSERT(set->set_exp);
1052         if (set->set_completes) {
1053                 /* FIXME update qos data here */
1054         }
1055
1056         if (atomic_dec_and_test(&set->set_refcount))
1057                 lov_finish_set(set);
1058
1059         RETURN(0);
1060 }
1061
1062 int lov_prep_destroy_set(struct obd_export *exp, struct obd_info *oinfo,
1063                          struct obdo *src_oa, struct lov_stripe_md *lsm,
1064                          struct obd_trans_info *oti,
1065                          struct lov_request_set **reqset)
1066 {
1067         struct lov_request_set *set;
1068         struct lov_obd *lov = &exp->exp_obd->u.lov;
1069         int rc = 0, i;
1070         ENTRY;
1071
1072         OBD_ALLOC(set, sizeof(*set));
1073         if (set == NULL)
1074                 RETURN(-ENOMEM);
1075         lov_init_set(set);
1076
1077         set->set_exp = exp;
1078         set->set_oi = oinfo;
1079         set->set_oi->oi_md = lsm;
1080         set->set_oi->oi_oa = src_oa;
1081         set->set_oti = oti;
1082         if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE)
1083                 set->set_cookies = oti->oti_logcookies;
1084
1085         for (i = 0; i < lsm->lsm_stripe_count; i++) {
1086                 struct lov_oinfo *loi;
1087                 struct lov_request *req;
1088
1089                 loi = lsm->lsm_oinfo[i];
1090                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1091                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1092                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1093                         continue;
1094                 }
1095
1096                 OBD_ALLOC(req, sizeof(*req));
1097                 if (req == NULL)
1098                         GOTO(out_set, rc = -ENOMEM);
1099
1100                 req->rq_stripe = i;
1101                 req->rq_idx = loi->loi_ost_idx;
1102
1103                 OBDO_ALLOC(req->rq_oi.oi_oa);
1104                 if (req->rq_oi.oi_oa == NULL) {
1105                         OBD_FREE(req, sizeof(*req));
1106                         GOTO(out_set, rc = -ENOMEM);
1107                 }
1108                 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
1109                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1110                 lov_set_add_req(req, set);
1111         }
1112         if (!set->set_count)
1113                 GOTO(out_set, rc = -EIO);
1114         *reqset = set;
1115         RETURN(rc);
1116 out_set:
1117         lov_fini_destroy_set(set);
1118         RETURN(rc);
1119 }
1120
1121 int lov_fini_setattr_set(struct lov_request_set *set)
1122 {
1123         int rc = 0;
1124         ENTRY;
1125
1126         if (set == NULL)
1127                 RETURN(0);
1128         LASSERT(set->set_exp);
1129         if (set->set_completes) {
1130                 rc = common_attr_done(set);
1131                 /* FIXME update qos data here */
1132         }
1133
1134         if (atomic_dec_and_test(&set->set_refcount))
1135                 lov_finish_set(set);
1136         RETURN(rc);
1137 }
1138
1139 int lov_update_setattr_set(struct lov_request_set *set,
1140                            struct lov_request *req, int rc)
1141 {
1142         struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1143         struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1144         ENTRY;
1145
1146         lov_update_set(set, req, rc);
1147
1148         /* grace error on inactive ost */
1149         if (rc && !(lov->lov_tgts[req->rq_idx] &&
1150                     lov->lov_tgts[req->rq_idx]->ltd_active))
1151                 rc = 0;
1152
1153         if (rc == 0) {
1154                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCTIME)
1155                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_ctime =
1156                                 req->rq_oi.oi_oa->o_ctime;
1157                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLMTIME)
1158                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_mtime =
1159                                 req->rq_oi.oi_oa->o_mtime;
1160                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLATIME)
1161                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_atime =
1162                                 req->rq_oi.oi_oa->o_atime;
1163         }
1164
1165         RETURN(rc);
1166 }
1167
1168 /* The callback for osc_setattr_async that finilizes a request info when a
1169  * response is received. */
1170 static int cb_setattr_update(void *cookie, int rc)
1171 {
1172         struct obd_info *oinfo = cookie;
1173         struct lov_request *lovreq;
1174         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1175         return lov_update_setattr_set(lovreq->rq_rqset, lovreq, rc);
1176 }
1177
1178 int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo,
1179                          struct obd_trans_info *oti,
1180                          struct lov_request_set **reqset)
1181 {
1182         struct lov_request_set *set;
1183         struct lov_obd *lov = &exp->exp_obd->u.lov;
1184         int rc = 0, i;
1185         ENTRY;
1186
1187         OBD_ALLOC(set, sizeof(*set));
1188         if (set == NULL)
1189                 RETURN(-ENOMEM);
1190         lov_init_set(set);
1191
1192         set->set_exp = exp;
1193         set->set_oti = oti;
1194         set->set_oi = oinfo;
1195         if (oti != NULL && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
1196                 set->set_cookies = oti->oti_logcookies;
1197
1198         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1199                 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1200                 struct lov_request *req;
1201
1202                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1203                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1204                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1205                         continue;
1206                 }
1207
1208                 OBD_ALLOC(req, sizeof(*req));
1209                 if (req == NULL)
1210                         GOTO(out_set, rc = -ENOMEM);
1211                 req->rq_stripe = i;
1212                 req->rq_idx = loi->loi_ost_idx;
1213
1214                 OBDO_ALLOC(req->rq_oi.oi_oa);
1215                 if (req->rq_oi.oi_oa == NULL) {
1216                         OBD_FREE(req, sizeof(*req));
1217                         GOTO(out_set, rc = -ENOMEM);
1218                 }
1219                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1220                        sizeof(*req->rq_oi.oi_oa));
1221                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1222                 req->rq_oi.oi_oa->o_stripe_idx = i;
1223                 req->rq_oi.oi_cb_up = cb_setattr_update;
1224                 req->rq_oi.oi_capa = oinfo->oi_capa;
1225
1226                 if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE) {
1227                         int off = lov_stripe_offset(oinfo->oi_md,
1228                                                     oinfo->oi_oa->o_size, i,
1229                                                     &req->rq_oi.oi_oa->o_size);
1230
1231                         if (off < 0 && req->rq_oi.oi_oa->o_size)
1232                                 req->rq_oi.oi_oa->o_size--;
1233
1234                         CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
1235                                i, req->rq_oi.oi_oa->o_size,
1236                                oinfo->oi_oa->o_size);
1237                 }
1238                 lov_set_add_req(req, set);
1239         }
1240         if (!set->set_count)
1241                 GOTO(out_set, rc = -EIO);
1242         *reqset = set;
1243         RETURN(rc);
1244 out_set:
1245         lov_fini_setattr_set(set);
1246         RETURN(rc);
1247 }
1248
1249 int lov_fini_punch_set(struct lov_request_set *set)
1250 {
1251         int rc = 0;
1252         ENTRY;
1253
1254         if (set == NULL)
1255                 RETURN(0);
1256         LASSERT(set->set_exp);
1257         if (set->set_completes) {
1258                 rc = -EIO;
1259                 /* FIXME update qos data here */
1260                 if (set->set_success)
1261                         rc = common_attr_done(set);
1262         }
1263
1264         if (atomic_dec_and_test(&set->set_refcount))
1265                 lov_finish_set(set);
1266
1267         RETURN(rc);
1268 }
1269
1270 int lov_update_punch_set(struct lov_request_set *set,
1271                          struct lov_request *req, int rc)
1272 {
1273         struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1274         struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1275         ENTRY;
1276
1277         lov_update_set(set, req, rc);
1278
1279         /* grace error on inactive ost */
1280         if (rc && !lov->lov_tgts[req->rq_idx]->ltd_active)
1281                 rc = 0;
1282
1283         if (rc == 0) {
1284                 lov_stripe_lock(lsm);
1285                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS) {
1286                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_blocks =
1287                                 req->rq_oi.oi_oa->o_blocks;
1288                 }
1289
1290                 /* Do we need to update lvb_size here? It needn't because
1291                  * it have been done in ll_truncate(). -jay */
1292                 lov_stripe_unlock(lsm);
1293         }
1294
1295         RETURN(rc);
1296 }
1297
1298 /* The callback for osc_punch that finilizes a request info when a response
1299  * is received. */
1300 static int cb_update_punch(void *cookie, int rc)
1301 {
1302         struct obd_info *oinfo = cookie;
1303         struct lov_request *lovreq;
1304         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1305         return lov_update_punch_set(lovreq->rq_rqset, lovreq, rc);
1306 }
1307
1308 int lov_prep_punch_set(struct obd_export *exp, struct obd_info *oinfo,
1309                        struct obd_trans_info *oti,
1310                        struct lov_request_set **reqset)
1311 {
1312         struct lov_request_set *set;
1313         struct lov_obd *lov = &exp->exp_obd->u.lov;
1314         int rc = 0, i;
1315         ENTRY;
1316
1317         OBD_ALLOC(set, sizeof(*set));
1318         if (set == NULL)
1319                 RETURN(-ENOMEM);
1320         lov_init_set(set);
1321
1322         set->set_oi = oinfo;
1323         set->set_exp = exp;
1324
1325         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1326                 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1327                 struct lov_request *req;
1328                 obd_off rs, re;
1329
1330                 if (!lov_stripe_intersects(oinfo->oi_md, i,
1331                                            oinfo->oi_policy.l_extent.start,
1332                                            oinfo->oi_policy.l_extent.end,
1333                                            &rs, &re))
1334                         continue;
1335
1336                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1337                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1338                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1339                         GOTO(out_set, rc = -EIO);
1340                 }
1341
1342                 OBD_ALLOC(req, sizeof(*req));
1343                 if (req == NULL)
1344                         GOTO(out_set, rc = -ENOMEM);
1345                 req->rq_stripe = i;
1346                 req->rq_idx = loi->loi_ost_idx;
1347
1348                 OBDO_ALLOC(req->rq_oi.oi_oa);
1349                 if (req->rq_oi.oi_oa == NULL) {
1350                         OBD_FREE(req, sizeof(*req));
1351                         GOTO(out_set, rc = -ENOMEM);
1352                 }
1353                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1354                        sizeof(*req->rq_oi.oi_oa));
1355                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1356                 req->rq_oi.oi_oa->o_gr = loi->loi_gr;
1357                 req->rq_oi.oi_oa->o_valid |= OBD_MD_FLGROUP;
1358
1359                 req->rq_oi.oi_oa->o_stripe_idx = i;
1360                 req->rq_oi.oi_cb_up = cb_update_punch;
1361
1362                 req->rq_oi.oi_policy.l_extent.start = rs;
1363                 req->rq_oi.oi_policy.l_extent.end = re;
1364                 req->rq_oi.oi_policy.l_extent.gid = -1;
1365
1366                 req->rq_oi.oi_capa = oinfo->oi_capa;
1367
1368                 lov_set_add_req(req, set);
1369         }
1370         if (!set->set_count)
1371                 GOTO(out_set, rc = -EIO);
1372         *reqset = set;
1373         RETURN(rc);
1374 out_set:
1375         lov_fini_punch_set(set);
1376         RETURN(rc);
1377 }
1378
1379 int lov_fini_sync_set(struct lov_request_set *set)
1380 {
1381         int rc = 0;
1382         ENTRY;
1383
1384         if (set == NULL)
1385                 RETURN(0);
1386         LASSERT(set->set_exp);
1387         if (set->set_completes) {
1388                 if (!set->set_success)
1389                         rc = -EIO;
1390                 /* FIXME update qos data here */
1391         }
1392
1393         if (atomic_dec_and_test(&set->set_refcount))
1394                 lov_finish_set(set);
1395
1396         RETURN(rc);
1397 }
1398
1399 int lov_prep_sync_set(struct obd_export *exp, struct obd_info *oinfo,
1400                       struct obdo *src_oa, struct lov_stripe_md *lsm,
1401                       obd_off start, obd_off end,
1402                       struct lov_request_set **reqset)
1403 {
1404         struct lov_request_set *set;
1405         struct lov_obd *lov = &exp->exp_obd->u.lov;
1406         int rc = 0, i;
1407         ENTRY;
1408
1409         OBD_ALLOC(set, sizeof(*set));
1410         if (set == NULL)
1411                 RETURN(-ENOMEM);
1412         lov_init_set(set);
1413
1414         set->set_exp = exp;
1415         set->set_oi = oinfo;
1416         set->set_oi->oi_md = lsm;
1417         set->set_oi->oi_oa = src_oa;
1418
1419         for (i = 0; i < lsm->lsm_stripe_count; i++) {
1420                 struct lov_oinfo *loi = lsm->lsm_oinfo[i];
1421                 struct lov_request *req;
1422                 obd_off rs, re;
1423
1424                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1425                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1426                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1427                         continue;
1428                 }
1429
1430                 if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
1431                         continue;
1432
1433                 OBD_ALLOC(req, sizeof(*req));
1434                 if (req == NULL)
1435                         GOTO(out_set, rc = -ENOMEM);
1436                 req->rq_stripe = i;
1437                 req->rq_idx = loi->loi_ost_idx;
1438
1439                 OBDO_ALLOC(req->rq_oi.oi_oa);
1440                 if (req->rq_oi.oi_oa == NULL) {
1441                         OBD_FREE(req, sizeof(*req));
1442                         GOTO(out_set, rc = -ENOMEM);
1443                 }
1444                 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
1445                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1446                 req->rq_oi.oi_oa->o_stripe_idx = i;
1447
1448                 req->rq_oi.oi_policy.l_extent.start = rs;
1449                 req->rq_oi.oi_policy.l_extent.end = re;
1450                 req->rq_oi.oi_policy.l_extent.gid = -1;
1451
1452                 lov_set_add_req(req, set);
1453         }
1454         if (!set->set_count)
1455                 GOTO(out_set, rc = -EIO);
1456         *reqset = set;
1457         RETURN(rc);
1458 out_set:
1459         lov_fini_sync_set(set);
1460         RETURN(rc);
1461 }
1462
1463 #define LOV_U64_MAX ((__u64)~0ULL)
1464 #define LOV_SUM_MAX(tot, add)                                           \
1465         do {                                                            \
1466                 if ((tot) + (add) < (tot))                              \
1467                         (tot) = LOV_U64_MAX;                            \
1468                 else                                                    \
1469                         (tot) += (add);                                 \
1470         } while(0)
1471
1472 int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs,int success)
1473 {
1474         ENTRY;
1475
1476         if (success) {
1477                 __u32 expected_stripes = lov_get_stripecnt(&obd->u.lov, 0);
1478
1479                 if (osfs->os_files != LOV_U64_MAX)
1480                         do_div(osfs->os_files, expected_stripes);
1481                 if (osfs->os_ffree != LOV_U64_MAX)
1482                         do_div(osfs->os_ffree, expected_stripes);
1483
1484                 spin_lock(&obd->obd_osfs_lock);
1485                 memcpy(&obd->obd_osfs, osfs, sizeof(*osfs));
1486                 obd->obd_osfs_age = cfs_time_current_64();
1487                 spin_unlock(&obd->obd_osfs_lock);
1488                 RETURN(0);
1489         }
1490
1491         RETURN(-EIO);
1492 }
1493
1494 int lov_fini_statfs_set(struct lov_request_set *set)
1495 {
1496         int rc = 0;
1497         ENTRY;
1498
1499         if (set == NULL)
1500                 RETURN(0);
1501
1502         if (set->set_completes) {
1503                 rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs,
1504                                      set->set_success);
1505         }
1506
1507         if (atomic_dec_and_test(&set->set_refcount))
1508                 lov_finish_set(set);
1509
1510         RETURN(rc);
1511 }
1512
1513 void lov_update_statfs(struct obd_statfs *osfs, struct obd_statfs *lov_sfs,
1514                        int success)
1515 {
1516         int shift = 0, quit = 0;
1517         __u64 tmp;
1518
1519         if (success == 0) {
1520                 memcpy(osfs, lov_sfs, sizeof(*lov_sfs));
1521         } else {
1522                 if (osfs->os_bsize != lov_sfs->os_bsize) {
1523                         /* assume all block sizes are always powers of 2 */
1524                         /* get the bits difference */
1525                         tmp = osfs->os_bsize | lov_sfs->os_bsize;
1526                         for (shift = 0; shift <= 64; ++shift) {
1527                                 if (tmp & 1) {
1528                                         if (quit)
1529                                                 break;
1530                                         else
1531                                                 quit = 1;
1532                                         shift = 0;
1533                                 }
1534                                 tmp >>= 1;
1535                         }
1536                 }
1537
1538                 if (osfs->os_bsize < lov_sfs->os_bsize) {
1539                         osfs->os_bsize = lov_sfs->os_bsize;
1540
1541                         osfs->os_bfree  >>= shift;
1542                         osfs->os_bavail >>= shift;
1543                         osfs->os_blocks >>= shift;
1544                 } else if (shift != 0) {
1545                         lov_sfs->os_bfree  >>= shift;
1546                         lov_sfs->os_bavail >>= shift;
1547                         lov_sfs->os_blocks >>= shift;
1548                 }
1549 #ifdef MIN_DF
1550                 /* Sandia requested that df (and so, statfs) only
1551                    returned minimal available space on
1552                    a single OST, so people would be able to
1553                    write this much data guaranteed. */
1554                 if (osfs->os_bavail > lov_sfs->os_bavail) {
1555                         /* Presumably if new bavail is smaller,
1556                            new bfree is bigger as well */
1557                         osfs->os_bfree = lov_sfs->os_bfree;
1558                         osfs->os_bavail = lov_sfs->os_bavail;
1559                 }
1560 #else
1561                 osfs->os_bfree += lov_sfs->os_bfree;
1562                 osfs->os_bavail += lov_sfs->os_bavail;
1563 #endif
1564                 osfs->os_blocks += lov_sfs->os_blocks;
1565                 /* XXX not sure about this one - depends on policy.
1566                  *   - could be minimum if we always stripe on all OBDs
1567                  *     (but that would be wrong for any other policy,
1568                  *     if one of the OBDs has no more objects left)
1569                  *   - could be sum if we stripe whole objects
1570                  *   - could be average, just to give a nice number
1571                  *
1572                  * To give a "reasonable" (if not wholly accurate)
1573                  * number, we divide the total number of free objects
1574                  * by expected stripe count (watch out for overflow).
1575                  */
1576                 LOV_SUM_MAX(osfs->os_files, lov_sfs->os_files);
1577                 LOV_SUM_MAX(osfs->os_ffree, lov_sfs->os_ffree);
1578         }
1579 }
1580
1581 /* The callback for osc_statfs_async that finilizes a request info when a
1582  * response is received. */
1583 static int cb_statfs_update(void *cookie, int rc)
1584 {
1585         struct obd_info *oinfo = cookie;
1586         struct lov_request *lovreq;
1587         struct obd_statfs *osfs, *lov_sfs;
1588         struct lov_obd *lov;
1589         struct lov_tgt_desc *tgt;
1590         struct obd_device *lovobd, *tgtobd;
1591         int success;
1592         ENTRY;
1593
1594         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1595         lovobd = lovreq->rq_rqset->set_obd;
1596         lov = &lovobd->u.lov;
1597         osfs = lovreq->rq_rqset->set_oi->oi_osfs;
1598         lov_sfs = oinfo->oi_osfs;
1599         success = lovreq->rq_rqset->set_success;
1600         /* XXX: the same is done in lov_update_common_set, however
1601            lovset->set_exp is not initialized. */
1602         lov_update_set(lovreq->rq_rqset, lovreq, rc);
1603         if (rc)
1604                 GOTO(out, rc);
1605  
1606         obd_getref(lovobd);
1607         tgt = lov->lov_tgts[lovreq->rq_idx];
1608         if (!tgt || !tgt->ltd_active)
1609                 GOTO(out_update, rc);
1610
1611         tgtobd = class_exp2obd(tgt->ltd_exp);
1612         spin_lock(&tgtobd->obd_osfs_lock);
1613         memcpy(&tgtobd->obd_osfs, lov_sfs, sizeof(*lov_sfs));
1614         if ((oinfo->oi_flags & OBD_STATFS_FROM_CACHE) == 0)
1615                 tgtobd->obd_osfs_age = cfs_time_current_64();
1616         spin_unlock(&tgtobd->obd_osfs_lock);
1617
1618 out_update:
1619         lov_update_statfs(osfs, lov_sfs, success);
1620         qos_update(lov);
1621         obd_putref(lovobd);
1622
1623 out:
1624         if (lovreq->rq_rqset->set_oi->oi_flags & OBD_STATFS_PTLRPCD &&
1625             lov_finished_set(lovreq->rq_rqset)) {
1626                lov_statfs_interpret(NULL, lovreq->rq_rqset,
1627                                     lovreq->rq_rqset->set_success !=
1628                                                   lovreq->rq_rqset->set_count);
1629                qos_statfs_done(lov);
1630         }
1631
1632         RETURN(0);
1633 }
1634
1635 int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo,
1636                         struct lov_request_set **reqset)
1637 {
1638         struct lov_request_set *set;
1639         struct lov_obd *lov = &obd->u.lov;
1640         int rc = 0, i;
1641         ENTRY;
1642
1643         OBD_ALLOC(set, sizeof(*set));
1644         if (set == NULL)
1645                 RETURN(-ENOMEM);
1646         lov_init_set(set);
1647
1648         set->set_obd = obd;
1649         set->set_oi = oinfo;
1650
1651         /* We only get block data from the OBD */
1652         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1653                 struct lov_request *req;
1654
1655                 if (!lov->lov_tgts[i] || (!lov->lov_tgts[i]->ltd_active
1656                                           && (oinfo->oi_flags & OBD_STATFS_NODELAY))) {
1657                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
1658                         continue;
1659                 }
1660
1661                 /* skip targets that have been explicitely disabled by the
1662                  * administrator */
1663                 if (!lov->lov_tgts[i]->ltd_exp) {
1664                         CDEBUG(D_HA, "lov idx %d administratively disabled\n", i);
1665                         continue;
1666                 }
1667
1668                 OBD_ALLOC(req, sizeof(*req));
1669                 if (req == NULL)
1670                         GOTO(out_set, rc = -ENOMEM);
1671
1672                 OBD_ALLOC(req->rq_oi.oi_osfs, sizeof(*req->rq_oi.oi_osfs));
1673                 if (req->rq_oi.oi_osfs == NULL) {
1674                         OBD_FREE(req, sizeof(*req));
1675                         GOTO(out_set, rc = -ENOMEM);
1676                 }
1677
1678                 req->rq_idx = i;
1679                 req->rq_oi.oi_cb_up = cb_statfs_update;
1680                 req->rq_oi.oi_flags = oinfo->oi_flags;
1681
1682                 lov_set_add_req(req, set);
1683         }
1684         if (!set->set_count)
1685                 GOTO(out_set, rc = -EIO);
1686         *reqset = set;
1687         RETURN(rc);
1688 out_set:
1689         lov_fini_statfs_set(set);
1690         RETURN(rc);
1691 }