Whamcloud - gitweb
Prepare for Build 33
[fs/lustre-release.git] / lustre / lov / lov_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_LOV
41
42 #ifdef __KERNEL__
43 #include <libcfs/libcfs.h>
44 #else
45 #include <liblustre.h>
46 #endif
47
48 #include <obd_class.h>
49 #include <obd_lov.h>
50 #include <lustre/lustre_idl.h>
51
52 #include "lov_internal.h"
53
54 static void lov_init_set(struct lov_request_set *set)
55 {
56         set->set_count = 0;
57         set->set_completes = 0;
58         set->set_success = 0;
59         set->set_cookies = 0;
60         CFS_INIT_LIST_HEAD(&set->set_list);
61         atomic_set(&set->set_refcount, 1);
62         cfs_waitq_init(&set->set_waitq);
63         spin_lock_init(&set->set_lock);
64 }
65
66 void lov_finish_set(struct lov_request_set *set)
67 {
68         struct list_head *pos, *n;
69         ENTRY;
70
71         LASSERT(set);
72         list_for_each_safe(pos, n, &set->set_list) {
73                 struct lov_request *req = list_entry(pos, struct lov_request,
74                                                      rq_link);
75                 list_del_init(&req->rq_link);
76
77                 if (req->rq_oi.oi_oa)
78                         OBDO_FREE(req->rq_oi.oi_oa);
79                 if (req->rq_oi.oi_md)
80                         OBD_FREE(req->rq_oi.oi_md, req->rq_buflen);
81                 if (req->rq_oi.oi_osfs)
82                         OBD_FREE(req->rq_oi.oi_osfs,
83                                  sizeof(*req->rq_oi.oi_osfs));
84                 OBD_FREE(req, sizeof(*req));
85         }
86
87         if (set->set_pga) {
88                 int len = set->set_oabufs * sizeof(*set->set_pga);
89                 OBD_FREE(set->set_pga, len);
90         }
91         if (set->set_lockh)
92                 lov_llh_put(set->set_lockh);
93
94         OBD_FREE(set, sizeof(*set));
95         EXIT;
96 }
97
98 int lov_finished_set(struct lov_request_set *set)
99 {
100         CDEBUG(D_INFO, "check set %d/%d\n", set->set_completes,
101                set->set_count);
102         return set->set_completes == set->set_count;
103 }
104
105
106 void lov_update_set(struct lov_request_set *set,
107                     struct lov_request *req, int rc)
108 {
109         req->rq_complete = 1;
110         req->rq_rc = rc;
111
112         set->set_completes++;
113         if (rc == 0)
114                 set->set_success++;
115
116         cfs_waitq_signal(&set->set_waitq);
117 }
118
119 int lov_update_common_set(struct lov_request_set *set,
120                           struct lov_request *req, int rc)
121 {
122         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
123         ENTRY;
124
125         lov_update_set(set, req, rc);
126
127         /* grace error on inactive ost */
128         if (rc && !(lov->lov_tgts[req->rq_idx] &&
129                     lov->lov_tgts[req->rq_idx]->ltd_active))
130                 rc = 0;
131
132         /* FIXME in raid1 regime, should return 0 */
133         RETURN(rc);
134 }
135
136 void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
137 {
138         list_add_tail(&req->rq_link, &set->set_list);
139         set->set_count++;
140         req->rq_rqset = set;
141 }
142
143 extern void osc_update_enqueue(struct lustre_handle *lov_lockhp,
144                                struct lov_oinfo *loi, int flags,
145                                struct ost_lvb *lvb, __u32 mode, int rc);
146
147 static int lov_update_enqueue_lov(struct obd_export *exp,
148                                   struct lustre_handle *lov_lockhp,
149                                   struct lov_oinfo *loi, int flags, int idx,
150                                   __u64 oid, int rc)
151 {
152         struct lov_obd *lov = &exp->exp_obd->u.lov;
153
154         if (rc != ELDLM_OK &&
155             !(rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT))) {
156                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
157                 if (lov->lov_tgts[idx] && lov->lov_tgts[idx]->ltd_active) {
158                         /* -EUSERS used by OST to report file contention */
159                         if (rc != -EINTR && rc != -EUSERS)
160                                 CERROR("enqueue objid "LPX64" subobj "
161                                        LPX64" on OST idx %d: rc %d\n",
162                                        oid, loi->loi_id, loi->loi_ost_idx, rc);
163                 } else
164                         rc = ELDLM_OK;
165         }
166         return rc;
167 }
168
169 int lov_update_enqueue_set(struct lov_request *req, __u32 mode, int rc)
170 {
171         struct lov_request_set *set = req->rq_rqset;
172         struct lustre_handle *lov_lockhp;
173         struct obd_info *oi = set->set_oi;
174         struct lov_oinfo *loi;
175         ENTRY;
176
177         LASSERT(oi != NULL);
178
179         lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
180         loi = oi->oi_md->lsm_oinfo[req->rq_stripe];
181
182         /* XXX LOV STACKING: OSC gets a copy, created in lov_prep_enqueue_set
183          * and that copy can be arbitrarily out of date.
184          *
185          * The LOV API is due for a serious rewriting anyways, and this
186          * can be addressed then. */
187
188         lov_stripe_lock(oi->oi_md);
189         osc_update_enqueue(lov_lockhp, loi, oi->oi_flags,
190                            &req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb, mode, rc);
191         if (rc == ELDLM_LOCK_ABORTED && (oi->oi_flags & LDLM_FL_HAS_INTENT))
192                 memset(lov_lockhp, 0, sizeof *lov_lockhp);
193         rc = lov_update_enqueue_lov(set->set_exp, lov_lockhp, loi, oi->oi_flags,
194                                     req->rq_idx, oi->oi_md->lsm_object_id, rc);
195         lov_stripe_unlock(oi->oi_md);
196         lov_update_set(set, req, rc);
197         RETURN(rc);
198 }
199
200 /* The callback for osc_enqueue that updates lov info for every OSC request. */
201 static int cb_update_enqueue(void *cookie, int rc)
202 {
203         struct obd_info *oinfo = cookie;
204         struct ldlm_enqueue_info *einfo;
205         struct lov_request *lovreq;
206
207         lovreq = container_of(oinfo, struct lov_request, rq_oi);
208         einfo = lovreq->rq_rqset->set_ei;
209         return lov_update_enqueue_set(lovreq, einfo->ei_mode, rc);
210 }
211
212 static int enqueue_done(struct lov_request_set *set, __u32 mode)
213 {
214         struct lov_request *req;
215         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
216         int rc = 0;
217         ENTRY;
218
219         /* enqueue/match success, just return */
220         if (set->set_completes && set->set_completes == set->set_success)
221                 RETURN(0);
222
223         /* cancel enqueued/matched locks */
224         list_for_each_entry(req, &set->set_list, rq_link) {
225                 struct lustre_handle *lov_lockhp;
226
227                 if (!req->rq_complete || req->rq_rc)
228                         continue;
229
230                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
231                 LASSERT(lov_lockhp);
232                 if (!lustre_handle_is_used(lov_lockhp))
233                         continue;
234
235                 rc = obd_cancel(lov->lov_tgts[req->rq_idx]->ltd_exp,
236                                 req->rq_oi.oi_md, mode, lov_lockhp);
237                 if (rc && lov->lov_tgts[req->rq_idx] &&
238                     lov->lov_tgts[req->rq_idx]->ltd_active)
239                         CERROR("cancelling obdjid "LPX64" on OST "
240                                "idx %d error: rc = %d\n",
241                                req->rq_oi.oi_md->lsm_object_id,
242                                req->rq_idx, rc);
243         }
244         if (set->set_lockh)
245                 lov_llh_put(set->set_lockh);
246         RETURN(rc);
247 }
248
249 int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc,
250                          struct ptlrpc_request_set *rqset)
251 {
252         int ret = 0;
253         ENTRY;
254
255         if (set == NULL)
256                 RETURN(0);
257         LASSERT(set->set_exp);
258         /* Do enqueue_done only for sync requests and if any request
259          * succeeded. */
260         if (!rqset) {
261                 if (rc)
262                         set->set_completes = 0;
263                 ret = enqueue_done(set, mode);
264         } else if (set->set_lockh)
265                 lov_llh_put(set->set_lockh);
266
267         lov_put_reqset(set);
268
269         RETURN(rc ? rc : ret);
270 }
271
272 int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo,
273                          struct ldlm_enqueue_info *einfo,
274                          struct lov_request_set **reqset)
275 {
276         struct lov_obd *lov = &exp->exp_obd->u.lov;
277         struct lov_request_set *set;
278         int i, rc = 0;
279         ENTRY;
280
281         OBD_ALLOC(set, sizeof(*set));
282         if (set == NULL)
283                 RETURN(-ENOMEM);
284         lov_init_set(set);
285
286         set->set_exp = exp;
287         set->set_oi = oinfo;
288         set->set_ei = einfo;
289         set->set_lockh = lov_llh_new(oinfo->oi_md);
290         if (set->set_lockh == NULL)
291                 GOTO(out_set, rc = -ENOMEM);
292         oinfo->oi_lockh->cookie = set->set_lockh->llh_handle.h_cookie;
293
294         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
295                 struct lov_oinfo *loi;
296                 struct lov_request *req;
297                 obd_off start, end;
298
299                 loi = oinfo->oi_md->lsm_oinfo[i];
300                 if (!lov_stripe_intersects(oinfo->oi_md, i,
301                                            oinfo->oi_policy.l_extent.start,
302                                            oinfo->oi_policy.l_extent.end,
303                                            &start, &end))
304                         continue;
305
306                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
307                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
308                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
309                         continue;
310                 }
311
312                 OBD_ALLOC(req, sizeof(*req));
313                 if (req == NULL)
314                         GOTO(out_set, rc = -ENOMEM);
315
316                 req->rq_buflen = sizeof(*req->rq_oi.oi_md) +
317                         sizeof(struct lov_oinfo *) +
318                         sizeof(struct lov_oinfo);
319                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
320                 if (req->rq_oi.oi_md == NULL) {
321                         OBD_FREE(req, sizeof(*req));
322                         GOTO(out_set, rc = -ENOMEM);
323                 }
324                 req->rq_oi.oi_md->lsm_oinfo[0] =
325                         ((void *)req->rq_oi.oi_md) + sizeof(*req->rq_oi.oi_md) +
326                         sizeof(struct lov_oinfo *);
327
328                 /* Set lov request specific parameters. */
329                 req->rq_oi.oi_lockh = set->set_lockh->llh_handles + i;
330                 req->rq_oi.oi_cb_up = cb_update_enqueue;
331                 req->rq_oi.oi_flags = oinfo->oi_flags;
332
333                 LASSERT(req->rq_oi.oi_lockh);
334
335                 req->rq_oi.oi_policy.l_extent.gid =
336                         oinfo->oi_policy.l_extent.gid;
337                 req->rq_oi.oi_policy.l_extent.start = start;
338                 req->rq_oi.oi_policy.l_extent.end = end;
339
340                 req->rq_idx = loi->loi_ost_idx;
341                 req->rq_stripe = i;
342
343                 /* XXX LOV STACKING: submd should be from the subobj */
344                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
345                 req->rq_oi.oi_md->lsm_object_gr = oinfo->oi_md->lsm_object_gr;
346                 req->rq_oi.oi_md->lsm_stripe_count = 0;
347                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms_valid =
348                         loi->loi_kms_valid;
349                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms = loi->loi_kms;
350                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb = loi->loi_lvb;
351
352                 lov_set_add_req(req, set);
353         }
354         if (!set->set_count)
355                 GOTO(out_set, rc = -EIO);
356         *reqset = set;
357         RETURN(0);
358 out_set:
359         lov_fini_enqueue_set(set, einfo->ei_mode, rc, NULL);
360         RETURN(rc);
361 }
362
363 int lov_update_match_set(struct lov_request_set *set, struct lov_request *req,
364                          int rc)
365 {
366         int ret = rc;
367         ENTRY;
368
369         if (rc > 0)
370                 ret = 0;
371         else if (rc == 0)
372                 ret = 1;
373         lov_update_set(set, req, ret);
374         RETURN(rc);
375 }
376
377 int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
378 {
379         int rc = 0;
380         ENTRY;
381
382         if (set == NULL)
383                 RETURN(0);
384         LASSERT(set->set_exp);
385         rc = enqueue_done(set, mode);
386         if ((set->set_count == set->set_success) &&
387             (flags & LDLM_FL_TEST_LOCK))
388                 lov_llh_put(set->set_lockh);
389
390         lov_put_reqset(set);
391
392         RETURN(rc);
393 }
394
395 int lov_prep_match_set(struct obd_export *exp, struct obd_info *oinfo,
396                        struct lov_stripe_md *lsm, ldlm_policy_data_t *policy,
397                        __u32 mode, struct lustre_handle *lockh,
398                        struct lov_request_set **reqset)
399 {
400         struct lov_obd *lov = &exp->exp_obd->u.lov;
401         struct lov_request_set *set;
402         int i, rc = 0;
403         ENTRY;
404
405         OBD_ALLOC(set, sizeof(*set));
406         if (set == NULL)
407                 RETURN(-ENOMEM);
408         lov_init_set(set);
409
410         set->set_exp = exp;
411         set->set_oi = oinfo;
412         set->set_oi->oi_md = lsm;
413         set->set_lockh = lov_llh_new(lsm);
414         if (set->set_lockh == NULL)
415                 GOTO(out_set, rc = -ENOMEM);
416         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
417
418         for (i = 0; i < lsm->lsm_stripe_count; i++){
419                 struct lov_oinfo *loi;
420                 struct lov_request *req;
421                 obd_off start, end;
422
423                 loi = lsm->lsm_oinfo[i];
424                 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
425                                            policy->l_extent.end, &start, &end))
426                         continue;
427
428                 /* FIXME raid1 should grace this error */
429                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
430                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
431                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
432                         GOTO(out_set, rc = -EIO);
433                 }
434
435                 OBD_ALLOC(req, sizeof(*req));
436                 if (req == NULL)
437                         GOTO(out_set, rc = -ENOMEM);
438
439                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
440                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
441                 if (req->rq_oi.oi_md == NULL) {
442                         OBD_FREE(req, sizeof(*req));
443                         GOTO(out_set, rc = -ENOMEM);
444                 }
445
446                 req->rq_oi.oi_policy.l_extent.start = start;
447                 req->rq_oi.oi_policy.l_extent.end = end;
448                 req->rq_oi.oi_policy.l_extent.gid = policy->l_extent.gid;
449
450                 req->rq_idx = loi->loi_ost_idx;
451                 req->rq_stripe = i;
452
453                 /* XXX LOV STACKING: submd should be from the subobj */
454                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
455                 req->rq_oi.oi_md->lsm_object_gr = lsm->lsm_object_gr;
456                 req->rq_oi.oi_md->lsm_stripe_count = 0;
457
458                 lov_set_add_req(req, set);
459         }
460         if (!set->set_count)
461                 GOTO(out_set, rc = -EIO);
462         *reqset = set;
463         RETURN(rc);
464 out_set:
465         lov_fini_match_set(set, mode, 0);
466         RETURN(rc);
467 }
468
469 int lov_fini_cancel_set(struct lov_request_set *set)
470 {
471         int rc = 0;
472         ENTRY;
473
474         if (set == NULL)
475                 RETURN(0);
476
477         LASSERT(set->set_exp);
478         if (set->set_lockh)
479                 lov_llh_put(set->set_lockh);
480
481         lov_put_reqset(set);
482
483         RETURN(rc);
484 }
485
486 int lov_prep_cancel_set(struct obd_export *exp, struct obd_info *oinfo,
487                         struct lov_stripe_md *lsm, __u32 mode,
488                         struct lustre_handle *lockh,
489                         struct lov_request_set **reqset)
490 {
491         struct lov_request_set *set;
492         int i, rc = 0;
493         ENTRY;
494
495         OBD_ALLOC(set, sizeof(*set));
496         if (set == NULL)
497                 RETURN(-ENOMEM);
498         lov_init_set(set);
499
500         set->set_exp = exp;
501         set->set_oi = oinfo;
502         set->set_oi->oi_md = lsm;
503         set->set_lockh = lov_handle2llh(lockh);
504         if (set->set_lockh == NULL) {
505                 CERROR("LOV: invalid lov lock handle %p\n", lockh);
506                 GOTO(out_set, rc = -EINVAL);
507         }
508         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
509
510         for (i = 0; i < lsm->lsm_stripe_count; i++){
511                 struct lov_request *req;
512                 struct lustre_handle *lov_lockhp;
513                 struct lov_oinfo *loi = lsm->lsm_oinfo[i];
514
515                 lov_lockhp = set->set_lockh->llh_handles + i;
516                 if (!lustre_handle_is_used(lov_lockhp)) {
517                         CDEBUG(D_INFO, "lov idx %d subobj "LPX64" no lock\n",
518                                loi->loi_ost_idx, loi->loi_id);
519                         continue;
520                 }
521
522                 OBD_ALLOC(req, sizeof(*req));
523                 if (req == NULL)
524                         GOTO(out_set, rc = -ENOMEM);
525
526                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
527                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
528                 if (req->rq_oi.oi_md == NULL) {
529                         OBD_FREE(req, sizeof(*req));
530                         GOTO(out_set, rc = -ENOMEM);
531                 }
532
533                 req->rq_idx = loi->loi_ost_idx;
534                 req->rq_stripe = i;
535
536                 /* XXX LOV STACKING: submd should be from the subobj */
537                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
538                 req->rq_oi.oi_md->lsm_object_gr = lsm->lsm_object_gr;
539                 req->rq_oi.oi_md->lsm_stripe_count = 0;
540
541                 lov_set_add_req(req, set);
542         }
543         if (!set->set_count)
544                 GOTO(out_set, rc = -EIO);
545         *reqset = set;
546         RETURN(rc);
547 out_set:
548         lov_fini_cancel_set(set);
549         RETURN(rc);
550 }
551
552 static int create_done(struct obd_export *exp, struct lov_request_set *set,
553                        struct lov_stripe_md **lsmp)
554 {
555         struct lov_obd *lov = &exp->exp_obd->u.lov;
556         struct obd_trans_info *oti = set->set_oti;
557         struct obdo *src_oa = set->set_oi->oi_oa;
558         struct lov_request *req;
559         struct obdo *ret_oa = NULL;
560         int attrset = 0, rc = 0;
561         ENTRY;
562
563         LASSERT(set->set_completes);
564
565         /* try alloc objects on other osts if osc_create fails for
566          * exceptions: RPC failure, ENOSPC, etc */
567         if (set->set_count != set->set_success) {
568                 list_for_each_entry (req, &set->set_list, rq_link) {
569                         if (req->rq_rc == 0)
570                                 continue;
571
572                         set->set_completes--;
573                         req->rq_complete = 0;
574
575                         rc = qos_remedy_create(set, req);
576                         lov_update_create_set(set, req, rc);
577                 }
578         }
579
580         /* no successful creates */
581         if (set->set_success == 0)
582                 GOTO(cleanup, rc);
583
584         if (set->set_count != set->set_success) {
585                 set->set_count = set->set_success;
586                 qos_shrink_lsm(set);
587         }
588
589         OBDO_ALLOC(ret_oa);
590         if (ret_oa == NULL)
591                 GOTO(cleanup, rc = -ENOMEM);
592
593         list_for_each_entry(req, &set->set_list, rq_link) {
594                 if (!req->rq_complete || req->rq_rc)
595                         continue;
596                 lov_merge_attrs(ret_oa, req->rq_oi.oi_oa,
597                                 req->rq_oi.oi_oa->o_valid, set->set_oi->oi_md,
598                                 req->rq_stripe, &attrset);
599         }
600         if (src_oa->o_valid & OBD_MD_FLSIZE &&
601             ret_oa->o_size != src_oa->o_size) {
602                 CERROR("original size "LPU64" isn't new object size "LPU64"\n",
603                        src_oa->o_size, ret_oa->o_size);
604                 LBUG();
605         }
606         ret_oa->o_id = src_oa->o_id;
607         ret_oa->o_gr = src_oa->o_gr;
608         ret_oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP;
609         memcpy(src_oa, ret_oa, sizeof(*src_oa));
610         OBDO_FREE(ret_oa);
611
612         *lsmp = set->set_oi->oi_md;
613         GOTO(done, rc = 0);
614
615 cleanup:
616         list_for_each_entry(req, &set->set_list, rq_link) {
617                 struct obd_export *sub_exp;
618                 int err = 0;
619
620                 if (!req->rq_complete || req->rq_rc)
621                         continue;
622
623                 sub_exp = lov->lov_tgts[req->rq_idx]->ltd_exp;
624                 err = obd_destroy(sub_exp, req->rq_oi.oi_oa, NULL, oti, NULL,
625                                   NULL);
626                 if (err)
627                         CERROR("Failed to uncreate objid "LPX64" subobj "
628                                LPX64" on OST idx %d: rc = %d\n",
629                                src_oa->o_id, req->rq_oi.oi_oa->o_id,
630                                req->rq_idx, rc);
631         }
632         if (*lsmp == NULL)
633                 obd_free_memmd(exp, &set->set_oi->oi_md);
634 done:
635         if (oti && set->set_cookies) {
636                 oti->oti_logcookies = set->set_cookies;
637                 if (!set->set_cookie_sent) {
638                         oti_free_cookies(oti);
639                         src_oa->o_valid &= ~OBD_MD_FLCOOKIE;
640                 } else {
641                         src_oa->o_valid |= OBD_MD_FLCOOKIE;
642                 }
643         }
644         RETURN(rc);
645 }
646
647 int lov_fini_create_set(struct lov_request_set *set,struct lov_stripe_md **lsmp)
648 {
649         int rc = 0;
650         ENTRY;
651
652         if (set == NULL)
653                 RETURN(0);
654         LASSERT(set->set_exp);
655         if (set->set_completes)
656                 rc = create_done(set->set_exp, set, lsmp);
657
658         lov_put_reqset(set);
659         RETURN(rc);
660 }
661
662 int lov_update_create_set(struct lov_request_set *set,
663                           struct lov_request *req, int rc)
664 {
665         struct obd_trans_info *oti = set->set_oti;
666         struct lov_stripe_md *lsm = set->set_oi->oi_md;
667         struct lov_oinfo *loi;
668         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
669         ENTRY;
670
671         if (rc && lov->lov_tgts[req->rq_idx] &&
672             lov->lov_tgts[req->rq_idx]->ltd_active) {
673                 CERROR("error creating fid "LPX64" sub-object"
674                        " on OST idx %d/%d: rc = %d\n",
675                        set->set_oi->oi_oa->o_id, req->rq_idx,
676                        lsm->lsm_stripe_count, rc);
677                 if (rc > 0) {
678                         CERROR("obd_create returned invalid err %d\n", rc);
679                         rc = -EIO;
680                 }
681         }
682
683         spin_lock(&set->set_lock);
684         req->rq_stripe = set->set_success;
685         loi = lsm->lsm_oinfo[req->rq_stripe];
686         if (rc) {
687                 lov_update_set(set, req, rc);
688                 spin_unlock(&set->set_lock);
689                 RETURN(rc);
690         }
691
692         loi->loi_id = req->rq_oi.oi_oa->o_id;
693         loi->loi_gr = req->rq_oi.oi_oa->o_gr;
694         loi->loi_ost_idx = req->rq_idx;
695         loi_init(loi);
696
697         if (oti && set->set_cookies)
698                 ++oti->oti_logcookies;
699         if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCOOKIE)
700                 set->set_cookie_sent++;
701
702         lov_update_set(set, req, rc);
703         spin_unlock(&set->set_lock);
704
705         CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPU64" at idx %d\n",
706                lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);
707         RETURN(rc);
708 }
709
710 int cb_create_update(void *cookie, int rc)
711 {
712         struct obd_info *oinfo = cookie;
713         struct lov_request *lovreq;
714
715         lovreq = container_of(oinfo, struct lov_request, rq_oi);
716         rc= lov_update_create_set(lovreq->rq_rqset, lovreq, rc);
717         if (lov_finished_set(lovreq->rq_rqset))
718                 lov_put_reqset(lovreq->rq_rqset);
719         return rc;
720 }
721
722
723 int lov_prep_create_set(struct obd_export *exp, struct obd_info *oinfo,
724                         struct lov_stripe_md **lsmp, struct obdo *src_oa,
725                         struct obd_trans_info *oti,
726                         struct lov_request_set **reqset)
727 {
728         struct lov_request_set *set;
729         int rc = 0;
730         ENTRY;
731
732         OBD_ALLOC(set, sizeof(*set));
733         if (set == NULL)
734                 RETURN(-ENOMEM);
735         lov_init_set(set);
736
737         set->set_exp = exp;
738         set->set_oi = oinfo;
739         set->set_oi->oi_md = *lsmp;
740         set->set_oi->oi_oa = src_oa;
741         set->set_oti = oti;
742         lov_get_reqset(set);
743
744         rc = qos_prep_create(exp, set);
745         /* qos_shrink_lsm() may have allocated a new lsm */
746         *lsmp = oinfo->oi_md;
747         if (rc) {
748                 lov_fini_create_set(set, lsmp);
749                 lov_put_reqset(set);
750         } else {
751                 *reqset = set;
752         }
753         RETURN(rc);
754 }
755
756 static int common_attr_done(struct lov_request_set *set)
757 {
758         struct list_head *pos;
759         struct lov_request *req;
760         struct obdo *tmp_oa;
761         int rc = 0, attrset = 0;
762         ENTRY;
763
764         LASSERT(set->set_oi != NULL);
765
766         if (set->set_oi->oi_oa == NULL)
767                 RETURN(0);
768
769         if (!set->set_success)
770                 RETURN(-EIO);
771
772         OBDO_ALLOC(tmp_oa);
773         if (tmp_oa == NULL)
774                 GOTO(out, rc = -ENOMEM);
775
776         list_for_each (pos, &set->set_list) {
777                 req = list_entry(pos, struct lov_request, rq_link);
778
779                 if (!req->rq_complete || req->rq_rc)
780                         continue;
781                 if (req->rq_oi.oi_oa->o_valid == 0)   /* inactive stripe */
782                         continue;
783                 lov_merge_attrs(tmp_oa, req->rq_oi.oi_oa,
784                                 req->rq_oi.oi_oa->o_valid,
785                                 set->set_oi->oi_md, req->rq_stripe, &attrset);
786         }
787         if (!attrset) {
788                 CERROR("No stripes had valid attrs\n");
789                 rc = -EIO;
790         }
791         if ((set->set_oi->oi_oa->o_valid & OBD_MD_FLEPOCH) &&
792             (set->set_oi->oi_md->lsm_stripe_count != attrset)) {
793                 /* When we take attributes of some epoch, we require all the
794                  * ost to be active. */
795                 CERROR("Not all the stripes had valid attrs\n");
796                 GOTO(out, rc = -EIO);
797         }
798
799         tmp_oa->o_id = set->set_oi->oi_oa->o_id;
800         memcpy(set->set_oi->oi_oa, tmp_oa, sizeof(*set->set_oi->oi_oa));
801 out:
802         if (tmp_oa)
803                 OBDO_FREE(tmp_oa);
804         RETURN(rc);
805
806 }
807
808 static int brw_done(struct lov_request_set *set)
809 {
810         struct lov_stripe_md *lsm = set->set_oi->oi_md;
811         struct lov_oinfo     *loi = NULL;
812         struct list_head *pos;
813         struct lov_request *req;
814         ENTRY;
815
816         list_for_each (pos, &set->set_list) {
817                 req = list_entry(pos, struct lov_request, rq_link);
818
819                 if (!req->rq_complete || req->rq_rc)
820                         continue;
821
822                 loi = lsm->lsm_oinfo[req->rq_stripe];
823
824                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS)
825                         loi->loi_lvb.lvb_blocks = req->rq_oi.oi_oa->o_blocks;
826         }
827
828         RETURN(0);
829 }
830
831 int lov_fini_brw_set(struct lov_request_set *set)
832 {
833         int rc = 0;
834         ENTRY;
835
836         if (set == NULL)
837                 RETURN(0);
838         LASSERT(set->set_exp);
839         if (set->set_completes) {
840                 rc = brw_done(set);
841                 /* FIXME update qos data here */
842         }
843         lov_put_reqset(set);
844
845         RETURN(rc);
846 }
847
848 int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo,
849                      obd_count oa_bufs, struct brw_page *pga,
850                      struct obd_trans_info *oti,
851                      struct lov_request_set **reqset)
852 {
853         struct {
854                 obd_count       index;
855                 obd_count       count;
856                 obd_count       off;
857         } *info = NULL;
858         struct lov_request_set *set;
859         struct lov_obd *lov = &exp->exp_obd->u.lov;
860         int rc = 0, i, shift;
861         ENTRY;
862
863         OBD_ALLOC(set, sizeof(*set));
864         if (set == NULL)
865                 RETURN(-ENOMEM);
866         lov_init_set(set);
867
868         set->set_exp = exp;
869         set->set_oti = oti;
870         set->set_oi = oinfo;
871         set->set_oabufs = oa_bufs;
872         OBD_ALLOC(set->set_pga, oa_bufs * sizeof(*set->set_pga));
873         if (!set->set_pga)
874                 GOTO(out, rc = -ENOMEM);
875
876         OBD_ALLOC(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
877         if (!info)
878                 GOTO(out, rc = -ENOMEM);
879
880         /* calculate the page count for each stripe */
881         for (i = 0; i < oa_bufs; i++) {
882                 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
883                 info[stripe].count++;
884         }
885
886         /* alloc and initialize lov request */
887         shift = 0;
888         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++){
889                 struct lov_oinfo *loi = NULL;
890                 struct lov_request *req;
891
892                 if (info[i].count == 0)
893                         continue;
894
895                 loi = oinfo->oi_md->lsm_oinfo[i];
896                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
897                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
898                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
899                         GOTO(out, rc = -EIO);
900                 }
901
902                 OBD_ALLOC(req, sizeof(*req));
903                 if (req == NULL)
904                         GOTO(out, rc = -ENOMEM);
905
906                 OBDO_ALLOC(req->rq_oi.oi_oa);
907                 if (req->rq_oi.oi_oa == NULL) {
908                         OBD_FREE(req, sizeof(*req));
909                         GOTO(out, rc = -ENOMEM);
910                 }
911
912                 if (oinfo->oi_oa) {
913                         memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
914                                sizeof(*req->rq_oi.oi_oa));
915                 }
916                 req->rq_oi.oi_oa->o_id = loi->loi_id;
917                 req->rq_oi.oi_oa->o_stripe_idx = i;
918
919                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
920                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
921                 if (req->rq_oi.oi_md == NULL) {
922                         OBDO_FREE(req->rq_oi.oi_oa);
923                         OBD_FREE(req, sizeof(*req));
924                         GOTO(out, rc = -ENOMEM);
925                 }
926
927                 req->rq_idx = loi->loi_ost_idx;
928                 req->rq_stripe = i;
929
930                 /* XXX LOV STACKING */
931                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
932                 req->rq_oi.oi_md->lsm_object_gr = oinfo->oi_md->lsm_object_gr;
933                 req->rq_oabufs = info[i].count;
934                 req->rq_pgaidx = shift;
935                 shift += req->rq_oabufs;
936
937                 /* remember the index for sort brw_page array */
938                 info[i].index = req->rq_pgaidx;
939
940                 req->rq_oi.oi_capa = oinfo->oi_capa;
941
942                 lov_set_add_req(req, set);
943         }
944         if (!set->set_count)
945                 GOTO(out, rc = -EIO);
946
947         /* rotate & sort the brw_page array */
948         for (i = 0; i < oa_bufs; i++) {
949                 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
950
951                 shift = info[stripe].index + info[stripe].off;
952                 LASSERT(shift < oa_bufs);
953                 set->set_pga[shift] = pga[i];
954                 lov_stripe_offset(oinfo->oi_md, pga[i].off, stripe,
955                                   &set->set_pga[shift].off);
956                 info[stripe].off++;
957         }
958 out:
959         if (info)
960                 OBD_FREE(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
961
962         if (rc == 0)
963                 *reqset = set;
964         else
965                 lov_fini_brw_set(set);
966
967         RETURN(rc);
968 }
969
970 int lov_fini_getattr_set(struct lov_request_set *set)
971 {
972         int rc = 0;
973         ENTRY;
974
975         if (set == NULL)
976                 RETURN(0);
977         LASSERT(set->set_exp);
978         if (set->set_completes)
979                 rc = common_attr_done(set);
980
981         lov_put_reqset(set);
982
983         RETURN(rc);
984 }
985
986 /* The callback for osc_getattr_async that finilizes a request info when a
987  * response is received. */
988 static int cb_getattr_update(void *cookie, int rc)
989 {
990         struct obd_info *oinfo = cookie;
991         struct lov_request *lovreq;
992         lovreq = container_of(oinfo, struct lov_request, rq_oi);
993         return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
994 }
995
996 int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo,
997                          struct lov_request_set **reqset)
998 {
999         struct lov_request_set *set;
1000         struct lov_obd *lov = &exp->exp_obd->u.lov;
1001         int rc = 0, i;
1002         ENTRY;
1003
1004         OBD_ALLOC(set, sizeof(*set));
1005         if (set == NULL)
1006                 RETURN(-ENOMEM);
1007         lov_init_set(set);
1008
1009         set->set_exp = exp;
1010         set->set_oi = oinfo;
1011
1012         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1013                 struct lov_oinfo *loi;
1014                 struct lov_request *req;
1015
1016                 loi = oinfo->oi_md->lsm_oinfo[i];
1017                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1018                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1019                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1020                         if (oinfo->oi_oa->o_valid & OBD_MD_FLEPOCH)
1021                                 /* SOM requires all the OSTs to be active. */
1022                                 GOTO(out_set, rc = -EIO);
1023                         continue;
1024                 }
1025
1026                 OBD_ALLOC(req, sizeof(*req));
1027                 if (req == NULL)
1028                         GOTO(out_set, rc = -ENOMEM);
1029
1030                 req->rq_stripe = i;
1031                 req->rq_idx = loi->loi_ost_idx;
1032
1033                 OBDO_ALLOC(req->rq_oi.oi_oa);
1034                 if (req->rq_oi.oi_oa == NULL) {
1035                         OBD_FREE(req, sizeof(*req));
1036                         GOTO(out_set, rc = -ENOMEM);
1037                 }
1038                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1039                        sizeof(*req->rq_oi.oi_oa));
1040                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1041                 req->rq_oi.oi_cb_up = cb_getattr_update;
1042                 req->rq_oi.oi_capa = oinfo->oi_capa;
1043
1044                 lov_set_add_req(req, set);
1045         }
1046         if (!set->set_count)
1047                 GOTO(out_set, rc = -EIO);
1048         *reqset = set;
1049         RETURN(rc);
1050 out_set:
1051         lov_fini_getattr_set(set);
1052         RETURN(rc);
1053 }
1054
1055 int lov_fini_destroy_set(struct lov_request_set *set)
1056 {
1057         ENTRY;
1058
1059         if (set == NULL)
1060                 RETURN(0);
1061         LASSERT(set->set_exp);
1062         if (set->set_completes) {
1063                 /* FIXME update qos data here */
1064         }
1065
1066         lov_put_reqset(set);
1067
1068         RETURN(0);
1069 }
1070
1071 int lov_prep_destroy_set(struct obd_export *exp, struct obd_info *oinfo,
1072                          struct obdo *src_oa, struct lov_stripe_md *lsm,
1073                          struct obd_trans_info *oti,
1074                          struct lov_request_set **reqset)
1075 {
1076         struct lov_request_set *set;
1077         struct lov_obd *lov = &exp->exp_obd->u.lov;
1078         int rc = 0, i;
1079         ENTRY;
1080
1081         OBD_ALLOC(set, sizeof(*set));
1082         if (set == NULL)
1083                 RETURN(-ENOMEM);
1084         lov_init_set(set);
1085
1086         set->set_exp = exp;
1087         set->set_oi = oinfo;
1088         set->set_oi->oi_md = lsm;
1089         set->set_oi->oi_oa = src_oa;
1090         set->set_oti = oti;
1091         if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE)
1092                 set->set_cookies = oti->oti_logcookies;
1093
1094         for (i = 0; i < lsm->lsm_stripe_count; i++) {
1095                 struct lov_oinfo *loi;
1096                 struct lov_request *req;
1097
1098                 loi = lsm->lsm_oinfo[i];
1099                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1100                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1101                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1102                         continue;
1103                 }
1104
1105                 OBD_ALLOC(req, sizeof(*req));
1106                 if (req == NULL)
1107                         GOTO(out_set, rc = -ENOMEM);
1108
1109                 req->rq_stripe = i;
1110                 req->rq_idx = loi->loi_ost_idx;
1111
1112                 OBDO_ALLOC(req->rq_oi.oi_oa);
1113                 if (req->rq_oi.oi_oa == NULL) {
1114                         OBD_FREE(req, sizeof(*req));
1115                         GOTO(out_set, rc = -ENOMEM);
1116                 }
1117                 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
1118                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1119                 lov_set_add_req(req, set);
1120         }
1121         if (!set->set_count)
1122                 GOTO(out_set, rc = -EIO);
1123         *reqset = set;
1124         RETURN(rc);
1125 out_set:
1126         lov_fini_destroy_set(set);
1127         RETURN(rc);
1128 }
1129
1130 int lov_fini_setattr_set(struct lov_request_set *set)
1131 {
1132         int rc = 0;
1133         ENTRY;
1134
1135         if (set == NULL)
1136                 RETURN(0);
1137         LASSERT(set->set_exp);
1138         if (set->set_completes) {
1139                 rc = common_attr_done(set);
1140                 /* FIXME update qos data here */
1141         }
1142
1143         lov_put_reqset(set);
1144         RETURN(rc);
1145 }
1146
1147 int lov_update_setattr_set(struct lov_request_set *set,
1148                            struct lov_request *req, int rc)
1149 {
1150         struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1151         struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1152         ENTRY;
1153
1154         lov_update_set(set, req, rc);
1155
1156         /* grace error on inactive ost */
1157         if (rc && !(lov->lov_tgts[req->rq_idx] &&
1158                     lov->lov_tgts[req->rq_idx]->ltd_active))
1159                 rc = 0;
1160
1161         if (rc == 0) {
1162                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCTIME)
1163                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_ctime =
1164                                 req->rq_oi.oi_oa->o_ctime;
1165                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLMTIME)
1166                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_mtime =
1167                                 req->rq_oi.oi_oa->o_mtime;
1168                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLATIME)
1169                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_atime =
1170                                 req->rq_oi.oi_oa->o_atime;
1171         }
1172
1173         RETURN(rc);
1174 }
1175
1176 /* The callback for osc_setattr_async that finilizes a request info when a
1177  * response is received. */
1178 static int cb_setattr_update(void *cookie, int rc)
1179 {
1180         struct obd_info *oinfo = cookie;
1181         struct lov_request *lovreq;
1182         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1183         return lov_update_setattr_set(lovreq->rq_rqset, lovreq, rc);
1184 }
1185
1186 int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo,
1187                          struct obd_trans_info *oti,
1188                          struct lov_request_set **reqset)
1189 {
1190         struct lov_request_set *set;
1191         struct lov_obd *lov = &exp->exp_obd->u.lov;
1192         int rc = 0, i;
1193         ENTRY;
1194
1195         OBD_ALLOC(set, sizeof(*set));
1196         if (set == NULL)
1197                 RETURN(-ENOMEM);
1198         lov_init_set(set);
1199
1200         set->set_exp = exp;
1201         set->set_oti = oti;
1202         set->set_oi = oinfo;
1203         if (oti != NULL && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
1204                 set->set_cookies = oti->oti_logcookies;
1205
1206         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1207                 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1208                 struct lov_request *req;
1209
1210                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1211                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1212                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1213                         continue;
1214                 }
1215
1216                 OBD_ALLOC(req, sizeof(*req));
1217                 if (req == NULL)
1218                         GOTO(out_set, rc = -ENOMEM);
1219                 req->rq_stripe = i;
1220                 req->rq_idx = loi->loi_ost_idx;
1221
1222                 OBDO_ALLOC(req->rq_oi.oi_oa);
1223                 if (req->rq_oi.oi_oa == NULL) {
1224                         OBD_FREE(req, sizeof(*req));
1225                         GOTO(out_set, rc = -ENOMEM);
1226                 }
1227                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1228                        sizeof(*req->rq_oi.oi_oa));
1229                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1230                 req->rq_oi.oi_oa->o_stripe_idx = i;
1231                 req->rq_oi.oi_cb_up = cb_setattr_update;
1232                 req->rq_oi.oi_capa = oinfo->oi_capa;
1233
1234                 if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE) {
1235                         int off = lov_stripe_offset(oinfo->oi_md,
1236                                                     oinfo->oi_oa->o_size, i,
1237                                                     &req->rq_oi.oi_oa->o_size);
1238
1239                         if (off < 0 && req->rq_oi.oi_oa->o_size)
1240                                 req->rq_oi.oi_oa->o_size--;
1241
1242                         CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
1243                                i, req->rq_oi.oi_oa->o_size,
1244                                oinfo->oi_oa->o_size);
1245                 }
1246                 lov_set_add_req(req, set);
1247         }
1248         if (!set->set_count)
1249                 GOTO(out_set, rc = -EIO);
1250         *reqset = set;
1251         RETURN(rc);
1252 out_set:
1253         lov_fini_setattr_set(set);
1254         RETURN(rc);
1255 }
1256
1257 int lov_fini_punch_set(struct lov_request_set *set)
1258 {
1259         int rc = 0;
1260         ENTRY;
1261
1262         if (set == NULL)
1263                 RETURN(0);
1264         LASSERT(set->set_exp);
1265         if (set->set_completes) {
1266                 rc = -EIO;
1267                 /* FIXME update qos data here */
1268                 if (set->set_success)
1269                         rc = common_attr_done(set);
1270         }
1271
1272         lov_put_reqset(set);
1273
1274         RETURN(rc);
1275 }
1276
1277 int lov_update_punch_set(struct lov_request_set *set,
1278                          struct lov_request *req, int rc)
1279 {
1280         struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1281         struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1282         ENTRY;
1283
1284         lov_update_set(set, req, rc);
1285
1286         /* grace error on inactive ost */
1287         if (rc && !lov->lov_tgts[req->rq_idx]->ltd_active)
1288                 rc = 0;
1289
1290         if (rc == 0) {
1291                 lov_stripe_lock(lsm);
1292                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS) {
1293                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_blocks =
1294                                 req->rq_oi.oi_oa->o_blocks;
1295                 }
1296
1297                 /* Do we need to update lvb_size here? It needn't because
1298                  * it have been done in ll_truncate(). -jay */
1299                 lov_stripe_unlock(lsm);
1300         }
1301
1302         RETURN(rc);
1303 }
1304
1305 /* The callback for osc_punch that finilizes a request info when a response
1306  * is received. */
1307 static int cb_update_punch(void *cookie, int rc)
1308 {
1309         struct obd_info *oinfo = cookie;
1310         struct lov_request *lovreq;
1311         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1312         return lov_update_punch_set(lovreq->rq_rqset, lovreq, rc);
1313 }
1314
1315 int lov_prep_punch_set(struct obd_export *exp, struct obd_info *oinfo,
1316                        struct obd_trans_info *oti,
1317                        struct lov_request_set **reqset)
1318 {
1319         struct lov_request_set *set;
1320         struct lov_obd *lov = &exp->exp_obd->u.lov;
1321         int rc = 0, i;
1322         ENTRY;
1323
1324         OBD_ALLOC(set, sizeof(*set));
1325         if (set == NULL)
1326                 RETURN(-ENOMEM);
1327         lov_init_set(set);
1328
1329         set->set_oi = oinfo;
1330         set->set_exp = exp;
1331
1332         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1333                 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1334                 struct lov_request *req;
1335                 obd_off rs, re;
1336
1337                 if (!lov_stripe_intersects(oinfo->oi_md, i,
1338                                            oinfo->oi_policy.l_extent.start,
1339                                            oinfo->oi_policy.l_extent.end,
1340                                            &rs, &re))
1341                         continue;
1342
1343                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1344                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1345                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1346                         GOTO(out_set, rc = -EIO);
1347                 }
1348
1349                 OBD_ALLOC(req, sizeof(*req));
1350                 if (req == NULL)
1351                         GOTO(out_set, rc = -ENOMEM);
1352                 req->rq_stripe = i;
1353                 req->rq_idx = loi->loi_ost_idx;
1354
1355                 OBDO_ALLOC(req->rq_oi.oi_oa);
1356                 if (req->rq_oi.oi_oa == NULL) {
1357                         OBD_FREE(req, sizeof(*req));
1358                         GOTO(out_set, rc = -ENOMEM);
1359                 }
1360                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1361                        sizeof(*req->rq_oi.oi_oa));
1362                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1363                 req->rq_oi.oi_oa->o_gr = loi->loi_gr;
1364                 req->rq_oi.oi_oa->o_valid |= OBD_MD_FLGROUP;
1365
1366                 req->rq_oi.oi_oa->o_stripe_idx = i;
1367                 req->rq_oi.oi_cb_up = cb_update_punch;
1368
1369                 req->rq_oi.oi_policy.l_extent.start = rs;
1370                 req->rq_oi.oi_policy.l_extent.end = re;
1371                 req->rq_oi.oi_policy.l_extent.gid = -1;
1372
1373                 req->rq_oi.oi_capa = oinfo->oi_capa;
1374
1375                 lov_set_add_req(req, set);
1376         }
1377         if (!set->set_count)
1378                 GOTO(out_set, rc = -EIO);
1379         *reqset = set;
1380         RETURN(rc);
1381 out_set:
1382         lov_fini_punch_set(set);
1383         RETURN(rc);
1384 }
1385
1386 int lov_fini_sync_set(struct lov_request_set *set)
1387 {
1388         int rc = 0;
1389         ENTRY;
1390
1391         if (set == NULL)
1392                 RETURN(0);
1393         LASSERT(set->set_exp);
1394         if (set->set_completes) {
1395                 if (!set->set_success)
1396                         rc = -EIO;
1397                 /* FIXME update qos data here */
1398         }
1399
1400         lov_put_reqset(set);
1401
1402         RETURN(rc);
1403 }
1404
1405 int lov_prep_sync_set(struct obd_export *exp, struct obd_info *oinfo,
1406                       struct obdo *src_oa, struct lov_stripe_md *lsm,
1407                       obd_off start, obd_off end,
1408                       struct lov_request_set **reqset)
1409 {
1410         struct lov_request_set *set;
1411         struct lov_obd *lov = &exp->exp_obd->u.lov;
1412         int rc = 0, i;
1413         ENTRY;
1414
1415         OBD_ALLOC(set, sizeof(*set));
1416         if (set == NULL)
1417                 RETURN(-ENOMEM);
1418         lov_init_set(set);
1419
1420         set->set_exp = exp;
1421         set->set_oi = oinfo;
1422         set->set_oi->oi_md = lsm;
1423         set->set_oi->oi_oa = src_oa;
1424
1425         for (i = 0; i < lsm->lsm_stripe_count; i++) {
1426                 struct lov_oinfo *loi = lsm->lsm_oinfo[i];
1427                 struct lov_request *req;
1428                 obd_off rs, re;
1429
1430                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1431                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1432                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1433                         continue;
1434                 }
1435
1436                 if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
1437                         continue;
1438
1439                 OBD_ALLOC(req, sizeof(*req));
1440                 if (req == NULL)
1441                         GOTO(out_set, rc = -ENOMEM);
1442                 req->rq_stripe = i;
1443                 req->rq_idx = loi->loi_ost_idx;
1444
1445                 OBDO_ALLOC(req->rq_oi.oi_oa);
1446                 if (req->rq_oi.oi_oa == NULL) {
1447                         OBD_FREE(req, sizeof(*req));
1448                         GOTO(out_set, rc = -ENOMEM);
1449                 }
1450                 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
1451                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1452                 req->rq_oi.oi_oa->o_stripe_idx = i;
1453
1454                 req->rq_oi.oi_policy.l_extent.start = rs;
1455                 req->rq_oi.oi_policy.l_extent.end = re;
1456                 req->rq_oi.oi_policy.l_extent.gid = -1;
1457
1458                 lov_set_add_req(req, set);
1459         }
1460         if (!set->set_count)
1461                 GOTO(out_set, rc = -EIO);
1462         *reqset = set;
1463         RETURN(rc);
1464 out_set:
1465         lov_fini_sync_set(set);
1466         RETURN(rc);
1467 }
1468
1469 #define LOV_U64_MAX ((__u64)~0ULL)
1470 #define LOV_SUM_MAX(tot, add)                                           \
1471         do {                                                            \
1472                 if ((tot) + (add) < (tot))                              \
1473                         (tot) = LOV_U64_MAX;                            \
1474                 else                                                    \
1475                         (tot) += (add);                                 \
1476         } while(0)
1477
1478 int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs,int success)
1479 {
1480         ENTRY;
1481
1482         if (success) {
1483                 __u32 expected_stripes = lov_get_stripecnt(&obd->u.lov, 0);
1484
1485                 if (osfs->os_files != LOV_U64_MAX)
1486                         do_div(osfs->os_files, expected_stripes);
1487                 if (osfs->os_ffree != LOV_U64_MAX)
1488                         do_div(osfs->os_ffree, expected_stripes);
1489
1490                 spin_lock(&obd->obd_osfs_lock);
1491                 memcpy(&obd->obd_osfs, osfs, sizeof(*osfs));
1492                 obd->obd_osfs_age = cfs_time_current_64();
1493                 spin_unlock(&obd->obd_osfs_lock);
1494                 RETURN(0);
1495         }
1496
1497         RETURN(-EIO);
1498 }
1499
1500 int lov_fini_statfs_set(struct lov_request_set *set)
1501 {
1502         int rc = 0;
1503         ENTRY;
1504
1505         if (set == NULL)
1506                 RETURN(0);
1507
1508         if (set->set_completes) {
1509                 rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs,
1510                                      set->set_success);
1511         }
1512         lov_put_reqset(set);
1513         RETURN(rc);
1514 }
1515
1516 void lov_update_statfs(struct obd_statfs *osfs, struct obd_statfs *lov_sfs,
1517                        int success)
1518 {
1519         int shift = 0, quit = 0;
1520         __u64 tmp;
1521
1522         if (success == 0) {
1523                 memcpy(osfs, lov_sfs, sizeof(*lov_sfs));
1524         } else {
1525                 if (osfs->os_bsize != lov_sfs->os_bsize) {
1526                         /* assume all block sizes are always powers of 2 */
1527                         /* get the bits difference */
1528                         tmp = osfs->os_bsize | lov_sfs->os_bsize;
1529                         for (shift = 0; shift <= 64; ++shift) {
1530                                 if (tmp & 1) {
1531                                         if (quit)
1532                                                 break;
1533                                         else
1534                                                 quit = 1;
1535                                         shift = 0;
1536                                 }
1537                                 tmp >>= 1;
1538                         }
1539                 }
1540
1541                 if (osfs->os_bsize < lov_sfs->os_bsize) {
1542                         osfs->os_bsize = lov_sfs->os_bsize;
1543
1544                         osfs->os_bfree  >>= shift;
1545                         osfs->os_bavail >>= shift;
1546                         osfs->os_blocks >>= shift;
1547                 } else if (shift != 0) {
1548                         lov_sfs->os_bfree  >>= shift;
1549                         lov_sfs->os_bavail >>= shift;
1550                         lov_sfs->os_blocks >>= shift;
1551                 }
1552 #ifdef MIN_DF
1553                 /* Sandia requested that df (and so, statfs) only
1554                    returned minimal available space on
1555                    a single OST, so people would be able to
1556                    write this much data guaranteed. */
1557                 if (osfs->os_bavail > lov_sfs->os_bavail) {
1558                         /* Presumably if new bavail is smaller,
1559                            new bfree is bigger as well */
1560                         osfs->os_bfree = lov_sfs->os_bfree;
1561                         osfs->os_bavail = lov_sfs->os_bavail;
1562                 }
1563 #else
1564                 osfs->os_bfree += lov_sfs->os_bfree;
1565                 osfs->os_bavail += lov_sfs->os_bavail;
1566 #endif
1567                 osfs->os_blocks += lov_sfs->os_blocks;
1568                 /* XXX not sure about this one - depends on policy.
1569                  *   - could be minimum if we always stripe on all OBDs
1570                  *     (but that would be wrong for any other policy,
1571                  *     if one of the OBDs has no more objects left)
1572                  *   - could be sum if we stripe whole objects
1573                  *   - could be average, just to give a nice number
1574                  *
1575                  * To give a "reasonable" (if not wholly accurate)
1576                  * number, we divide the total number of free objects
1577                  * by expected stripe count (watch out for overflow).
1578                  */
1579                 LOV_SUM_MAX(osfs->os_files, lov_sfs->os_files);
1580                 LOV_SUM_MAX(osfs->os_ffree, lov_sfs->os_ffree);
1581         }
1582 }
1583
1584 /* The callback for osc_statfs_async that finilizes a request info when a
1585  * response is received. */
1586 static int cb_statfs_update(void *cookie, int rc)
1587 {
1588         struct obd_info *oinfo = cookie;
1589         struct lov_request *lovreq;
1590         struct obd_statfs *osfs, *lov_sfs;
1591         struct lov_obd *lov;
1592         struct lov_tgt_desc *tgt;
1593         struct obd_device *lovobd, *tgtobd;
1594         int success;
1595         ENTRY;
1596
1597         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1598         lovobd = lovreq->rq_rqset->set_obd;
1599         lov = &lovobd->u.lov;
1600         osfs = lovreq->rq_rqset->set_oi->oi_osfs;
1601         lov_sfs = oinfo->oi_osfs;
1602         success = lovreq->rq_rqset->set_success;
1603         /* XXX: the same is done in lov_update_common_set, however
1604            lovset->set_exp is not initialized. */
1605         lov_update_set(lovreq->rq_rqset, lovreq, rc);
1606         if (rc)
1607                 GOTO(out, rc);
1608  
1609         obd_getref(lovobd);
1610         tgt = lov->lov_tgts[lovreq->rq_idx];
1611         if (!tgt || !tgt->ltd_active)
1612                 GOTO(out_update, rc);
1613
1614         tgtobd = class_exp2obd(tgt->ltd_exp);
1615         spin_lock(&tgtobd->obd_osfs_lock);
1616         memcpy(&tgtobd->obd_osfs, lov_sfs, sizeof(*lov_sfs));
1617         if ((oinfo->oi_flags & OBD_STATFS_FROM_CACHE) == 0)
1618                 tgtobd->obd_osfs_age = cfs_time_current_64();
1619         spin_unlock(&tgtobd->obd_osfs_lock);
1620
1621 out_update:
1622         lov_update_statfs(osfs, lov_sfs, success);
1623         qos_update(lov);
1624         obd_putref(lovobd);
1625
1626 out:
1627         if (lovreq->rq_rqset->set_oi->oi_flags & OBD_STATFS_PTLRPCD &&
1628             lov_finished_set(lovreq->rq_rqset)) {
1629                lov_statfs_interpret(NULL, lovreq->rq_rqset,
1630                                     lovreq->rq_rqset->set_success !=
1631                                                   lovreq->rq_rqset->set_count);
1632                qos_statfs_done(lov);
1633         }
1634
1635         RETURN(0);
1636 }
1637
1638 int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo,
1639                         struct lov_request_set **reqset)
1640 {
1641         struct lov_request_set *set;
1642         struct lov_obd *lov = &obd->u.lov;
1643         int rc = 0, i;
1644         ENTRY;
1645
1646         OBD_ALLOC(set, sizeof(*set));
1647         if (set == NULL)
1648                 RETURN(-ENOMEM);
1649         lov_init_set(set);
1650
1651         set->set_obd = obd;
1652         set->set_oi = oinfo;
1653
1654         /* We only get block data from the OBD */
1655         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1656                 struct lov_request *req;
1657
1658                 if (!lov->lov_tgts[i] || (!lov->lov_tgts[i]->ltd_active
1659                                           && (oinfo->oi_flags & OBD_STATFS_NODELAY))) {
1660                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
1661                         continue;
1662                 }
1663
1664                 /* skip targets that have been explicitely disabled by the
1665                  * administrator */
1666                 if (!lov->lov_tgts[i]->ltd_exp) {
1667                         CDEBUG(D_HA, "lov idx %d administratively disabled\n", i);
1668                         continue;
1669                 }
1670
1671                 OBD_ALLOC(req, sizeof(*req));
1672                 if (req == NULL)
1673                         GOTO(out_set, rc = -ENOMEM);
1674
1675                 OBD_ALLOC(req->rq_oi.oi_osfs, sizeof(*req->rq_oi.oi_osfs));
1676                 if (req->rq_oi.oi_osfs == NULL) {
1677                         OBD_FREE(req, sizeof(*req));
1678                         GOTO(out_set, rc = -ENOMEM);
1679                 }
1680
1681                 req->rq_idx = i;
1682                 req->rq_oi.oi_cb_up = cb_statfs_update;
1683                 req->rq_oi.oi_flags = oinfo->oi_flags;
1684
1685                 lov_set_add_req(req, set);
1686         }
1687         if (!set->set_count)
1688                 GOTO(out_set, rc = -EIO);
1689         *reqset = set;
1690         RETURN(rc);
1691 out_set:
1692         lov_fini_statfs_set(set);
1693         RETURN(rc);
1694 }