Whamcloud - gitweb
Update copyrights on source files changed since 2010-02-15.
[fs/lustre-release.git] / lustre / lov / lov_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_LOV
41
42 #ifdef __KERNEL__
43 #include <libcfs/libcfs.h>
44 #else
45 #include <liblustre.h>
46 #endif
47
48 #include <obd_class.h>
49 #include <obd_lov.h>
50 #include <lustre/lustre_idl.h>
51
52 #include "lov_internal.h"
53
54 static void lov_init_set(struct lov_request_set *set)
55 {
56         set->set_count = 0;
57         set->set_completes = 0;
58         set->set_success = 0;
59         set->set_cookies = 0;
60         CFS_INIT_LIST_HEAD(&set->set_list);
61         cfs_atomic_set(&set->set_refcount, 1);
62         cfs_waitq_init(&set->set_waitq);
63         cfs_spin_lock_init(&set->set_lock);
64 }
65
66 void lov_finish_set(struct lov_request_set *set)
67 {
68         cfs_list_t *pos, *n;
69         ENTRY;
70
71         LASSERT(set);
72         cfs_list_for_each_safe(pos, n, &set->set_list) {
73                 struct lov_request *req = cfs_list_entry(pos,
74                                                          struct lov_request,
75                                                          rq_link);
76                 cfs_list_del_init(&req->rq_link);
77
78                 if (req->rq_oi.oi_oa)
79                         OBDO_FREE(req->rq_oi.oi_oa);
80                 if (req->rq_oi.oi_md)
81                         OBD_FREE(req->rq_oi.oi_md, req->rq_buflen);
82                 if (req->rq_oi.oi_osfs)
83                         OBD_FREE(req->rq_oi.oi_osfs,
84                                  sizeof(*req->rq_oi.oi_osfs));
85                 OBD_FREE(req, sizeof(*req));
86         }
87
88         if (set->set_pga) {
89                 int len = set->set_oabufs * sizeof(*set->set_pga);
90                 OBD_FREE(set->set_pga, len);
91         }
92         if (set->set_lockh)
93                 lov_llh_put(set->set_lockh);
94
95         OBD_FREE(set, sizeof(*set));
96         EXIT;
97 }
98
99 int lov_finished_set(struct lov_request_set *set)
100 {
101         CDEBUG(D_INFO, "check set %d/%d\n", set->set_completes,
102                set->set_count);
103         return set->set_completes == set->set_count;
104 }
105
106
107 void lov_update_set(struct lov_request_set *set,
108                     struct lov_request *req, int rc)
109 {
110         req->rq_complete = 1;
111         req->rq_rc = rc;
112
113         set->set_completes++;
114         if (rc == 0)
115                 set->set_success++;
116
117         cfs_waitq_signal(&set->set_waitq);
118 }
119
120 int lov_update_common_set(struct lov_request_set *set,
121                           struct lov_request *req, int rc)
122 {
123         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
124         ENTRY;
125
126         lov_update_set(set, req, rc);
127
128         /* grace error on inactive ost */
129         if (rc && !(lov->lov_tgts[req->rq_idx] &&
130                     lov->lov_tgts[req->rq_idx]->ltd_active))
131                 rc = 0;
132
133         /* FIXME in raid1 regime, should return 0 */
134         RETURN(rc);
135 }
136
137 void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
138 {
139         cfs_list_add_tail(&req->rq_link, &set->set_list);
140         set->set_count++;
141         req->rq_rqset = set;
142 }
143
144 extern void osc_update_enqueue(struct lustre_handle *lov_lockhp,
145                                struct lov_oinfo *loi, int flags,
146                                struct ost_lvb *lvb, __u32 mode, int rc);
147
148 static int lov_update_enqueue_lov(struct obd_export *exp,
149                                   struct lustre_handle *lov_lockhp,
150                                   struct lov_oinfo *loi, int flags, int idx,
151                                   __u64 oid, int rc)
152 {
153         struct lov_obd *lov = &exp->exp_obd->u.lov;
154
155         if (rc != ELDLM_OK &&
156             !(rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT))) {
157                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
158                 if (lov->lov_tgts[idx] && lov->lov_tgts[idx]->ltd_active) {
159                         /* -EUSERS used by OST to report file contention */
160                         if (rc != -EINTR && rc != -EUSERS)
161                                 CERROR("enqueue objid "LPX64" subobj "
162                                        LPX64" on OST idx %d: rc %d\n",
163                                        oid, loi->loi_id, loi->loi_ost_idx, rc);
164                 } else
165                         rc = ELDLM_OK;
166         }
167         return rc;
168 }
169
170 int lov_update_enqueue_set(struct lov_request *req, __u32 mode, int rc)
171 {
172         struct lov_request_set *set = req->rq_rqset;
173         struct lustre_handle *lov_lockhp;
174         struct obd_info *oi = set->set_oi;
175         struct lov_oinfo *loi;
176         ENTRY;
177
178         LASSERT(oi != NULL);
179
180         lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
181         loi = oi->oi_md->lsm_oinfo[req->rq_stripe];
182
183         /* XXX LOV STACKING: OSC gets a copy, created in lov_prep_enqueue_set
184          * and that copy can be arbitrarily out of date.
185          *
186          * The LOV API is due for a serious rewriting anyways, and this
187          * can be addressed then. */
188
189         lov_stripe_lock(oi->oi_md);
190         osc_update_enqueue(lov_lockhp, loi, oi->oi_flags,
191                            &req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb, mode, rc);
192         if (rc == ELDLM_LOCK_ABORTED && (oi->oi_flags & LDLM_FL_HAS_INTENT))
193                 memset(lov_lockhp, 0, sizeof *lov_lockhp);
194         rc = lov_update_enqueue_lov(set->set_exp, lov_lockhp, loi, oi->oi_flags,
195                                     req->rq_idx, oi->oi_md->lsm_object_id, rc);
196         lov_stripe_unlock(oi->oi_md);
197         lov_update_set(set, req, rc);
198         RETURN(rc);
199 }
200
201 /* The callback for osc_enqueue that updates lov info for every OSC request. */
202 static int cb_update_enqueue(void *cookie, int rc)
203 {
204         struct obd_info *oinfo = cookie;
205         struct ldlm_enqueue_info *einfo;
206         struct lov_request *lovreq;
207
208         lovreq = container_of(oinfo, struct lov_request, rq_oi);
209         einfo = lovreq->rq_rqset->set_ei;
210         return lov_update_enqueue_set(lovreq, einfo->ei_mode, rc);
211 }
212
213 static int enqueue_done(struct lov_request_set *set, __u32 mode)
214 {
215         struct lov_request *req;
216         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
217         int rc = 0;
218         ENTRY;
219
220         /* enqueue/match success, just return */
221         if (set->set_completes && set->set_completes == set->set_success)
222                 RETURN(0);
223
224         /* cancel enqueued/matched locks */
225         cfs_list_for_each_entry(req, &set->set_list, rq_link) {
226                 struct lustre_handle *lov_lockhp;
227
228                 if (!req->rq_complete || req->rq_rc)
229                         continue;
230
231                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
232                 LASSERT(lov_lockhp);
233                 if (!lustre_handle_is_used(lov_lockhp))
234                         continue;
235
236                 rc = obd_cancel(lov->lov_tgts[req->rq_idx]->ltd_exp,
237                                 req->rq_oi.oi_md, mode, lov_lockhp);
238                 if (rc && lov->lov_tgts[req->rq_idx] &&
239                     lov->lov_tgts[req->rq_idx]->ltd_active)
240                         CERROR("cancelling obdjid "LPX64" on OST "
241                                "idx %d error: rc = %d\n",
242                                req->rq_oi.oi_md->lsm_object_id,
243                                req->rq_idx, rc);
244         }
245         if (set->set_lockh)
246                 lov_llh_put(set->set_lockh);
247         RETURN(rc);
248 }
249
250 int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc,
251                          struct ptlrpc_request_set *rqset)
252 {
253         int ret = 0;
254         ENTRY;
255
256         if (set == NULL)
257                 RETURN(0);
258         LASSERT(set->set_exp);
259         /* Do enqueue_done only for sync requests and if any request
260          * succeeded. */
261         if (!rqset) {
262                 if (rc)
263                         set->set_completes = 0;
264                 ret = enqueue_done(set, mode);
265         } else if (set->set_lockh)
266                 lov_llh_put(set->set_lockh);
267
268         lov_put_reqset(set);
269
270         RETURN(rc ? rc : ret);
271 }
272
273 int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo,
274                          struct ldlm_enqueue_info *einfo,
275                          struct lov_request_set **reqset)
276 {
277         struct lov_obd *lov = &exp->exp_obd->u.lov;
278         struct lov_request_set *set;
279         int i, rc = 0;
280         ENTRY;
281
282         OBD_ALLOC(set, sizeof(*set));
283         if (set == NULL)
284                 RETURN(-ENOMEM);
285         lov_init_set(set);
286
287         set->set_exp = exp;
288         set->set_oi = oinfo;
289         set->set_ei = einfo;
290         set->set_lockh = lov_llh_new(oinfo->oi_md);
291         if (set->set_lockh == NULL)
292                 GOTO(out_set, rc = -ENOMEM);
293         oinfo->oi_lockh->cookie = set->set_lockh->llh_handle.h_cookie;
294
295         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
296                 struct lov_oinfo *loi;
297                 struct lov_request *req;
298                 obd_off start, end;
299
300                 loi = oinfo->oi_md->lsm_oinfo[i];
301                 if (!lov_stripe_intersects(oinfo->oi_md, i,
302                                            oinfo->oi_policy.l_extent.start,
303                                            oinfo->oi_policy.l_extent.end,
304                                            &start, &end))
305                         continue;
306
307                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
308                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
309                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
310                         continue;
311                 }
312
313                 OBD_ALLOC(req, sizeof(*req));
314                 if (req == NULL)
315                         GOTO(out_set, rc = -ENOMEM);
316
317                 req->rq_buflen = sizeof(*req->rq_oi.oi_md) +
318                         sizeof(struct lov_oinfo *) +
319                         sizeof(struct lov_oinfo);
320                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
321                 if (req->rq_oi.oi_md == NULL) {
322                         OBD_FREE(req, sizeof(*req));
323                         GOTO(out_set, rc = -ENOMEM);
324                 }
325                 req->rq_oi.oi_md->lsm_oinfo[0] =
326                         ((void *)req->rq_oi.oi_md) + sizeof(*req->rq_oi.oi_md) +
327                         sizeof(struct lov_oinfo *);
328
329                 /* Set lov request specific parameters. */
330                 req->rq_oi.oi_lockh = set->set_lockh->llh_handles + i;
331                 req->rq_oi.oi_cb_up = cb_update_enqueue;
332                 req->rq_oi.oi_flags = oinfo->oi_flags;
333
334                 LASSERT(req->rq_oi.oi_lockh);
335
336                 req->rq_oi.oi_policy.l_extent.gid =
337                         oinfo->oi_policy.l_extent.gid;
338                 req->rq_oi.oi_policy.l_extent.start = start;
339                 req->rq_oi.oi_policy.l_extent.end = end;
340
341                 req->rq_idx = loi->loi_ost_idx;
342                 req->rq_stripe = i;
343
344                 /* XXX LOV STACKING: submd should be from the subobj */
345                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
346                 req->rq_oi.oi_md->lsm_object_seq = loi->loi_seq;
347                 req->rq_oi.oi_md->lsm_stripe_count = 0;
348                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms_valid =
349                         loi->loi_kms_valid;
350                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms = loi->loi_kms;
351                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb = loi->loi_lvb;
352
353                 lov_set_add_req(req, set);
354         }
355         if (!set->set_count)
356                 GOTO(out_set, rc = -EIO);
357         *reqset = set;
358         RETURN(0);
359 out_set:
360         lov_fini_enqueue_set(set, einfo->ei_mode, rc, NULL);
361         RETURN(rc);
362 }
363
364 int lov_update_match_set(struct lov_request_set *set, struct lov_request *req,
365                          int rc)
366 {
367         int ret = rc;
368         ENTRY;
369
370         if (rc > 0)
371                 ret = 0;
372         else if (rc == 0)
373                 ret = 1;
374         lov_update_set(set, req, ret);
375         RETURN(rc);
376 }
377
378 int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
379 {
380         int rc = 0;
381         ENTRY;
382
383         if (set == NULL)
384                 RETURN(0);
385         LASSERT(set->set_exp);
386         rc = enqueue_done(set, mode);
387         if ((set->set_count == set->set_success) &&
388             (flags & LDLM_FL_TEST_LOCK))
389                 lov_llh_put(set->set_lockh);
390
391         lov_put_reqset(set);
392
393         RETURN(rc);
394 }
395
396 int lov_prep_match_set(struct obd_export *exp, struct obd_info *oinfo,
397                        struct lov_stripe_md *lsm, ldlm_policy_data_t *policy,
398                        __u32 mode, struct lustre_handle *lockh,
399                        struct lov_request_set **reqset)
400 {
401         struct lov_obd *lov = &exp->exp_obd->u.lov;
402         struct lov_request_set *set;
403         int i, rc = 0;
404         ENTRY;
405
406         OBD_ALLOC(set, sizeof(*set));
407         if (set == NULL)
408                 RETURN(-ENOMEM);
409         lov_init_set(set);
410
411         set->set_exp = exp;
412         set->set_oi = oinfo;
413         set->set_oi->oi_md = lsm;
414         set->set_lockh = lov_llh_new(lsm);
415         if (set->set_lockh == NULL)
416                 GOTO(out_set, rc = -ENOMEM);
417         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
418
419         for (i = 0; i < lsm->lsm_stripe_count; i++){
420                 struct lov_oinfo *loi;
421                 struct lov_request *req;
422                 obd_off start, end;
423
424                 loi = lsm->lsm_oinfo[i];
425                 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
426                                            policy->l_extent.end, &start, &end))
427                         continue;
428
429                 /* FIXME raid1 should grace this error */
430                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
431                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
432                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
433                         GOTO(out_set, rc = -EIO);
434                 }
435
436                 OBD_ALLOC(req, sizeof(*req));
437                 if (req == NULL)
438                         GOTO(out_set, rc = -ENOMEM);
439
440                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
441                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
442                 if (req->rq_oi.oi_md == NULL) {
443                         OBD_FREE(req, sizeof(*req));
444                         GOTO(out_set, rc = -ENOMEM);
445                 }
446
447                 req->rq_oi.oi_policy.l_extent.start = start;
448                 req->rq_oi.oi_policy.l_extent.end = end;
449                 req->rq_oi.oi_policy.l_extent.gid = policy->l_extent.gid;
450
451                 req->rq_idx = loi->loi_ost_idx;
452                 req->rq_stripe = i;
453
454                 /* XXX LOV STACKING: submd should be from the subobj */
455                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
456                 req->rq_oi.oi_md->lsm_object_seq = loi->loi_seq;
457                 req->rq_oi.oi_md->lsm_stripe_count = 0;
458
459                 lov_set_add_req(req, set);
460         }
461         if (!set->set_count)
462                 GOTO(out_set, rc = -EIO);
463         *reqset = set;
464         RETURN(rc);
465 out_set:
466         lov_fini_match_set(set, mode, 0);
467         RETURN(rc);
468 }
469
470 int lov_fini_cancel_set(struct lov_request_set *set)
471 {
472         int rc = 0;
473         ENTRY;
474
475         if (set == NULL)
476                 RETURN(0);
477
478         LASSERT(set->set_exp);
479         if (set->set_lockh)
480                 lov_llh_put(set->set_lockh);
481
482         lov_put_reqset(set);
483
484         RETURN(rc);
485 }
486
487 int lov_prep_cancel_set(struct obd_export *exp, struct obd_info *oinfo,
488                         struct lov_stripe_md *lsm, __u32 mode,
489                         struct lustre_handle *lockh,
490                         struct lov_request_set **reqset)
491 {
492         struct lov_request_set *set;
493         int i, rc = 0;
494         ENTRY;
495
496         OBD_ALLOC(set, sizeof(*set));
497         if (set == NULL)
498                 RETURN(-ENOMEM);
499         lov_init_set(set);
500
501         set->set_exp = exp;
502         set->set_oi = oinfo;
503         set->set_oi->oi_md = lsm;
504         set->set_lockh = lov_handle2llh(lockh);
505         if (set->set_lockh == NULL) {
506                 CERROR("LOV: invalid lov lock handle %p\n", lockh);
507                 GOTO(out_set, rc = -EINVAL);
508         }
509         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
510
511         for (i = 0; i < lsm->lsm_stripe_count; i++){
512                 struct lov_request *req;
513                 struct lustre_handle *lov_lockhp;
514                 struct lov_oinfo *loi = lsm->lsm_oinfo[i];
515
516                 lov_lockhp = set->set_lockh->llh_handles + i;
517                 if (!lustre_handle_is_used(lov_lockhp)) {
518                         CDEBUG(D_INFO, "lov idx %d subobj "LPX64" no lock\n",
519                                loi->loi_ost_idx, loi->loi_id);
520                         continue;
521                 }
522
523                 OBD_ALLOC(req, sizeof(*req));
524                 if (req == NULL)
525                         GOTO(out_set, rc = -ENOMEM);
526
527                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
528                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
529                 if (req->rq_oi.oi_md == NULL) {
530                         OBD_FREE(req, sizeof(*req));
531                         GOTO(out_set, rc = -ENOMEM);
532                 }
533
534                 req->rq_idx = loi->loi_ost_idx;
535                 req->rq_stripe = i;
536
537                 /* XXX LOV STACKING: submd should be from the subobj */
538                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
539                 req->rq_oi.oi_md->lsm_object_seq = loi->loi_seq;
540                 req->rq_oi.oi_md->lsm_stripe_count = 0;
541
542                 lov_set_add_req(req, set);
543         }
544         if (!set->set_count)
545                 GOTO(out_set, rc = -EIO);
546         *reqset = set;
547         RETURN(rc);
548 out_set:
549         lov_fini_cancel_set(set);
550         RETURN(rc);
551 }
552
553 static int create_done(struct obd_export *exp, struct lov_request_set *set,
554                        struct lov_stripe_md **lsmp)
555 {
556         struct lov_obd *lov = &exp->exp_obd->u.lov;
557         struct obd_trans_info *oti = set->set_oti;
558         struct obdo *src_oa = set->set_oi->oi_oa;
559         struct lov_request *req;
560         struct obdo *ret_oa = NULL;
561         int attrset = 0, rc = 0;
562         ENTRY;
563
564         LASSERT(set->set_completes);
565
566         /* try alloc objects on other osts if osc_create fails for
567          * exceptions: RPC failure, ENOSPC, etc */
568         if (set->set_count != set->set_success) {
569                 cfs_list_for_each_entry (req, &set->set_list, rq_link) {
570                         if (req->rq_rc == 0)
571                                 continue;
572
573                         set->set_completes--;
574                         req->rq_complete = 0;
575
576                         rc = qos_remedy_create(set, req);
577                         lov_update_create_set(set, req, rc);
578                 }
579         }
580
581         /* no successful creates */
582         if (set->set_success == 0)
583                 GOTO(cleanup, rc);
584
585         if (set->set_count != set->set_success) {
586                 set->set_count = set->set_success;
587                 qos_shrink_lsm(set);
588         }
589
590         OBDO_ALLOC(ret_oa);
591         if (ret_oa == NULL)
592                 GOTO(cleanup, rc = -ENOMEM);
593
594         cfs_list_for_each_entry(req, &set->set_list, rq_link) {
595                 if (!req->rq_complete || req->rq_rc)
596                         continue;
597                 lov_merge_attrs(ret_oa, req->rq_oi.oi_oa,
598                                 req->rq_oi.oi_oa->o_valid, set->set_oi->oi_md,
599                                 req->rq_stripe, &attrset);
600         }
601         if (src_oa->o_valid & OBD_MD_FLSIZE &&
602             ret_oa->o_size != src_oa->o_size) {
603                 CERROR("original size "LPU64" isn't new object size "LPU64"\n",
604                        src_oa->o_size, ret_oa->o_size);
605                 LBUG();
606         }
607         ret_oa->o_id = src_oa->o_id;
608         ret_oa->o_seq = src_oa->o_seq;
609         ret_oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP;
610         memcpy(src_oa, ret_oa, sizeof(*src_oa));
611         OBDO_FREE(ret_oa);
612
613         *lsmp = set->set_oi->oi_md;
614         GOTO(done, rc = 0);
615
616 cleanup:
617         cfs_list_for_each_entry(req, &set->set_list, rq_link) {
618                 struct obd_export *sub_exp;
619                 int err = 0;
620
621                 if (!req->rq_complete || req->rq_rc)
622                         continue;
623
624                 sub_exp = lov->lov_tgts[req->rq_idx]->ltd_exp;
625                 err = obd_destroy(sub_exp, req->rq_oi.oi_oa, NULL, oti, NULL,
626                                   NULL);
627                 if (err)
628                         CERROR("Failed to uncreate objid "LPX64" subobj "
629                                LPX64" on OST idx %d: rc = %d\n",
630                                src_oa->o_id, req->rq_oi.oi_oa->o_id,
631                                req->rq_idx, rc);
632         }
633         if (*lsmp == NULL)
634                 obd_free_memmd(exp, &set->set_oi->oi_md);
635 done:
636         if (oti && set->set_cookies) {
637                 oti->oti_logcookies = set->set_cookies;
638                 if (!set->set_cookie_sent) {
639                         oti_free_cookies(oti);
640                         src_oa->o_valid &= ~OBD_MD_FLCOOKIE;
641                 } else {
642                         src_oa->o_valid |= OBD_MD_FLCOOKIE;
643                 }
644         }
645         RETURN(rc);
646 }
647
648 int lov_fini_create_set(struct lov_request_set *set,struct lov_stripe_md **lsmp)
649 {
650         int rc = 0;
651         ENTRY;
652
653         if (set == NULL)
654                 RETURN(0);
655         LASSERT(set->set_exp);
656         if (set->set_completes)
657                 rc = create_done(set->set_exp, set, lsmp);
658
659         lov_put_reqset(set);
660         RETURN(rc);
661 }
662
663 int lov_update_create_set(struct lov_request_set *set,
664                           struct lov_request *req, int rc)
665 {
666         struct obd_trans_info *oti = set->set_oti;
667         struct lov_stripe_md *lsm = set->set_oi->oi_md;
668         struct lov_oinfo *loi;
669         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
670         ENTRY;
671
672         if (rc && lov->lov_tgts[req->rq_idx] &&
673             lov->lov_tgts[req->rq_idx]->ltd_active) {
674                 CERROR("error creating fid "LPX64" sub-object"
675                        " on OST idx %d/%d: rc = %d\n",
676                        set->set_oi->oi_oa->o_id, req->rq_idx,
677                        lsm->lsm_stripe_count, rc);
678                 if (rc > 0) {
679                         CERROR("obd_create returned invalid err %d\n", rc);
680                         rc = -EIO;
681                 }
682         }
683
684         cfs_spin_lock(&set->set_lock);
685         req->rq_stripe = set->set_success;
686         loi = lsm->lsm_oinfo[req->rq_stripe];
687         if (rc) {
688                 lov_update_set(set, req, rc);
689                 cfs_spin_unlock(&set->set_lock);
690                 RETURN(rc);
691         }
692
693         loi->loi_id = req->rq_oi.oi_oa->o_id;
694         loi->loi_seq = req->rq_oi.oi_oa->o_seq;
695         loi->loi_ost_idx = req->rq_idx;
696         loi_init(loi);
697
698         if (oti && set->set_cookies)
699                 ++oti->oti_logcookies;
700         if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCOOKIE)
701                 set->set_cookie_sent++;
702
703         lov_update_set(set, req, rc);
704         cfs_spin_unlock(&set->set_lock);
705
706         CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPU64" at idx %d\n",
707                lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);
708         RETURN(rc);
709 }
710
711 int cb_create_update(void *cookie, int rc)
712 {
713         struct obd_info *oinfo = cookie;
714         struct lov_request *lovreq;
715
716         lovreq = container_of(oinfo, struct lov_request, rq_oi);
717         rc= lov_update_create_set(lovreq->rq_rqset, lovreq, rc);
718         if (lov_finished_set(lovreq->rq_rqset))
719                 lov_put_reqset(lovreq->rq_rqset);
720         return rc;
721 }
722
723
724 int lov_prep_create_set(struct obd_export *exp, struct obd_info *oinfo,
725                         struct lov_stripe_md **lsmp, struct obdo *src_oa,
726                         struct obd_trans_info *oti,
727                         struct lov_request_set **reqset)
728 {
729         struct lov_request_set *set;
730         int rc = 0;
731         ENTRY;
732
733         OBD_ALLOC(set, sizeof(*set));
734         if (set == NULL)
735                 RETURN(-ENOMEM);
736         lov_init_set(set);
737
738         set->set_exp = exp;
739         set->set_oi = oinfo;
740         set->set_oi->oi_md = *lsmp;
741         set->set_oi->oi_oa = src_oa;
742         set->set_oti = oti;
743         lov_get_reqset(set);
744
745         rc = qos_prep_create(exp, set);
746         /* qos_shrink_lsm() may have allocated a new lsm */
747         *lsmp = oinfo->oi_md;
748         if (rc) {
749                 lov_fini_create_set(set, lsmp);
750                 lov_put_reqset(set);
751         } else {
752                 *reqset = set;
753         }
754         RETURN(rc);
755 }
756
757 static int common_attr_done(struct lov_request_set *set)
758 {
759         cfs_list_t *pos;
760         struct lov_request *req;
761         struct obdo *tmp_oa;
762         int rc = 0, attrset = 0;
763         ENTRY;
764
765         LASSERT(set->set_oi != NULL);
766
767         if (set->set_oi->oi_oa == NULL)
768                 RETURN(0);
769
770         if (!set->set_success)
771                 RETURN(-EIO);
772
773         OBDO_ALLOC(tmp_oa);
774         if (tmp_oa == NULL)
775                 GOTO(out, rc = -ENOMEM);
776
777         cfs_list_for_each (pos, &set->set_list) {
778                 req = cfs_list_entry(pos, struct lov_request, rq_link);
779
780                 if (!req->rq_complete || req->rq_rc)
781                         continue;
782                 if (req->rq_oi.oi_oa->o_valid == 0)   /* inactive stripe */
783                         continue;
784                 lov_merge_attrs(tmp_oa, req->rq_oi.oi_oa,
785                                 req->rq_oi.oi_oa->o_valid,
786                                 set->set_oi->oi_md, req->rq_stripe, &attrset);
787         }
788         if (!attrset) {
789                 CERROR("No stripes had valid attrs\n");
790                 rc = -EIO;
791         }
792         if ((set->set_oi->oi_oa->o_valid & OBD_MD_FLEPOCH) &&
793             (set->set_oi->oi_md->lsm_stripe_count != attrset)) {
794                 /* When we take attributes of some epoch, we require all the
795                  * ost to be active. */
796                 CERROR("Not all the stripes had valid attrs\n");
797                 GOTO(out, rc = -EIO);
798         }
799
800         tmp_oa->o_id = set->set_oi->oi_oa->o_id;
801         memcpy(set->set_oi->oi_oa, tmp_oa, sizeof(*set->set_oi->oi_oa));
802 out:
803         if (tmp_oa)
804                 OBDO_FREE(tmp_oa);
805         RETURN(rc);
806
807 }
808
809 static int brw_done(struct lov_request_set *set)
810 {
811         struct lov_stripe_md *lsm = set->set_oi->oi_md;
812         struct lov_oinfo     *loi = NULL;
813         cfs_list_t *pos;
814         struct lov_request *req;
815         ENTRY;
816
817         cfs_list_for_each (pos, &set->set_list) {
818                 req = cfs_list_entry(pos, struct lov_request, rq_link);
819
820                 if (!req->rq_complete || req->rq_rc)
821                         continue;
822
823                 loi = lsm->lsm_oinfo[req->rq_stripe];
824
825                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS)
826                         loi->loi_lvb.lvb_blocks = req->rq_oi.oi_oa->o_blocks;
827         }
828
829         RETURN(0);
830 }
831
832 int lov_fini_brw_set(struct lov_request_set *set)
833 {
834         int rc = 0;
835         ENTRY;
836
837         if (set == NULL)
838                 RETURN(0);
839         LASSERT(set->set_exp);
840         if (set->set_completes) {
841                 rc = brw_done(set);
842                 /* FIXME update qos data here */
843         }
844         lov_put_reqset(set);
845
846         RETURN(rc);
847 }
848
849 int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo,
850                      obd_count oa_bufs, struct brw_page *pga,
851                      struct obd_trans_info *oti,
852                      struct lov_request_set **reqset)
853 {
854         struct {
855                 obd_count       index;
856                 obd_count       count;
857                 obd_count       off;
858         } *info = NULL;
859         struct lov_request_set *set;
860         struct lov_obd *lov = &exp->exp_obd->u.lov;
861         int rc = 0, i, shift;
862         ENTRY;
863
864         OBD_ALLOC(set, sizeof(*set));
865         if (set == NULL)
866                 RETURN(-ENOMEM);
867         lov_init_set(set);
868
869         set->set_exp = exp;
870         set->set_oti = oti;
871         set->set_oi = oinfo;
872         set->set_oabufs = oa_bufs;
873         OBD_ALLOC(set->set_pga, oa_bufs * sizeof(*set->set_pga));
874         if (!set->set_pga)
875                 GOTO(out, rc = -ENOMEM);
876
877         OBD_ALLOC(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
878         if (!info)
879                 GOTO(out, rc = -ENOMEM);
880
881         /* calculate the page count for each stripe */
882         for (i = 0; i < oa_bufs; i++) {
883                 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
884                 info[stripe].count++;
885         }
886
887         /* alloc and initialize lov request */
888         shift = 0;
889         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++){
890                 struct lov_oinfo *loi = NULL;
891                 struct lov_request *req;
892
893                 if (info[i].count == 0)
894                         continue;
895
896                 loi = oinfo->oi_md->lsm_oinfo[i];
897                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
898                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
899                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
900                         GOTO(out, rc = -EIO);
901                 }
902
903                 OBD_ALLOC(req, sizeof(*req));
904                 if (req == NULL)
905                         GOTO(out, rc = -ENOMEM);
906
907                 OBDO_ALLOC(req->rq_oi.oi_oa);
908                 if (req->rq_oi.oi_oa == NULL) {
909                         OBD_FREE(req, sizeof(*req));
910                         GOTO(out, rc = -ENOMEM);
911                 }
912
913                 if (oinfo->oi_oa) {
914                         memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
915                                sizeof(*req->rq_oi.oi_oa));
916                 }
917                 req->rq_oi.oi_oa->o_id = loi->loi_id;
918                 req->rq_oi.oi_oa->o_seq = loi->loi_seq;
919                 req->rq_oi.oi_oa->o_stripe_idx = i;
920
921                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
922                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
923                 if (req->rq_oi.oi_md == NULL) {
924                         OBDO_FREE(req->rq_oi.oi_oa);
925                         OBD_FREE(req, sizeof(*req));
926                         GOTO(out, rc = -ENOMEM);
927                 }
928
929                 req->rq_idx = loi->loi_ost_idx;
930                 req->rq_stripe = i;
931
932                 /* XXX LOV STACKING */
933                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
934                 req->rq_oi.oi_md->lsm_object_seq = loi->loi_seq;
935                 req->rq_oabufs = info[i].count;
936                 req->rq_pgaidx = shift;
937                 shift += req->rq_oabufs;
938
939                 /* remember the index for sort brw_page array */
940                 info[i].index = req->rq_pgaidx;
941
942                 req->rq_oi.oi_capa = oinfo->oi_capa;
943
944                 lov_set_add_req(req, set);
945         }
946         if (!set->set_count)
947                 GOTO(out, rc = -EIO);
948
949         /* rotate & sort the brw_page array */
950         for (i = 0; i < oa_bufs; i++) {
951                 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
952
953                 shift = info[stripe].index + info[stripe].off;
954                 LASSERT(shift < oa_bufs);
955                 set->set_pga[shift] = pga[i];
956                 lov_stripe_offset(oinfo->oi_md, pga[i].off, stripe,
957                                   &set->set_pga[shift].off);
958                 info[stripe].off++;
959         }
960 out:
961         if (info)
962                 OBD_FREE(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
963
964         if (rc == 0)
965                 *reqset = set;
966         else
967                 lov_fini_brw_set(set);
968
969         RETURN(rc);
970 }
971
972 int lov_fini_getattr_set(struct lov_request_set *set)
973 {
974         int rc = 0;
975         ENTRY;
976
977         if (set == NULL)
978                 RETURN(0);
979         LASSERT(set->set_exp);
980         if (set->set_completes)
981                 rc = common_attr_done(set);
982
983         lov_put_reqset(set);
984
985         RETURN(rc);
986 }
987
988 /* The callback for osc_getattr_async that finilizes a request info when a
989  * response is received. */
990 static int cb_getattr_update(void *cookie, int rc)
991 {
992         struct obd_info *oinfo = cookie;
993         struct lov_request *lovreq;
994         lovreq = container_of(oinfo, struct lov_request, rq_oi);
995         return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
996 }
997
998 int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo,
999                          struct lov_request_set **reqset)
1000 {
1001         struct lov_request_set *set;
1002         struct lov_obd *lov = &exp->exp_obd->u.lov;
1003         int rc = 0, i;
1004         ENTRY;
1005
1006         OBD_ALLOC(set, sizeof(*set));
1007         if (set == NULL)
1008                 RETURN(-ENOMEM);
1009         lov_init_set(set);
1010
1011         set->set_exp = exp;
1012         set->set_oi = oinfo;
1013
1014         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1015                 struct lov_oinfo *loi;
1016                 struct lov_request *req;
1017
1018                 loi = oinfo->oi_md->lsm_oinfo[i];
1019                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1020                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1021                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1022                         if (oinfo->oi_oa->o_valid & OBD_MD_FLEPOCH)
1023                                 /* SOM requires all the OSTs to be active. */
1024                                 GOTO(out_set, rc = -EIO);
1025                         continue;
1026                 }
1027
1028                 OBD_ALLOC(req, sizeof(*req));
1029                 if (req == NULL)
1030                         GOTO(out_set, rc = -ENOMEM);
1031
1032                 req->rq_stripe = i;
1033                 req->rq_idx = loi->loi_ost_idx;
1034
1035                 OBDO_ALLOC(req->rq_oi.oi_oa);
1036                 if (req->rq_oi.oi_oa == NULL) {
1037                         OBD_FREE(req, sizeof(*req));
1038                         GOTO(out_set, rc = -ENOMEM);
1039                 }
1040                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1041                        sizeof(*req->rq_oi.oi_oa));
1042                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1043                 req->rq_oi.oi_oa->o_seq = loi->loi_seq;
1044                 req->rq_oi.oi_cb_up = cb_getattr_update;
1045                 req->rq_oi.oi_capa = oinfo->oi_capa;
1046
1047                 lov_set_add_req(req, set);
1048         }
1049         if (!set->set_count)
1050                 GOTO(out_set, rc = -EIO);
1051         *reqset = set;
1052         RETURN(rc);
1053 out_set:
1054         lov_fini_getattr_set(set);
1055         RETURN(rc);
1056 }
1057
1058 int lov_fini_destroy_set(struct lov_request_set *set)
1059 {
1060         ENTRY;
1061
1062         if (set == NULL)
1063                 RETURN(0);
1064         LASSERT(set->set_exp);
1065         if (set->set_completes) {
1066                 /* FIXME update qos data here */
1067         }
1068
1069         lov_put_reqset(set);
1070
1071         RETURN(0);
1072 }
1073
1074 int lov_prep_destroy_set(struct obd_export *exp, struct obd_info *oinfo,
1075                          struct obdo *src_oa, struct lov_stripe_md *lsm,
1076                          struct obd_trans_info *oti,
1077                          struct lov_request_set **reqset)
1078 {
1079         struct lov_request_set *set;
1080         struct lov_obd *lov = &exp->exp_obd->u.lov;
1081         int rc = 0, i;
1082         ENTRY;
1083
1084         OBD_ALLOC(set, sizeof(*set));
1085         if (set == NULL)
1086                 RETURN(-ENOMEM);
1087         lov_init_set(set);
1088
1089         set->set_exp = exp;
1090         set->set_oi = oinfo;
1091         set->set_oi->oi_md = lsm;
1092         set->set_oi->oi_oa = src_oa;
1093         set->set_oti = oti;
1094         if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE)
1095                 set->set_cookies = oti->oti_logcookies;
1096
1097         for (i = 0; i < lsm->lsm_stripe_count; i++) {
1098                 struct lov_oinfo *loi;
1099                 struct lov_request *req;
1100
1101                 loi = lsm->lsm_oinfo[i];
1102                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1103                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1104                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1105                         continue;
1106                 }
1107
1108                 OBD_ALLOC(req, sizeof(*req));
1109                 if (req == NULL)
1110                         GOTO(out_set, rc = -ENOMEM);
1111
1112                 req->rq_stripe = i;
1113                 req->rq_idx = loi->loi_ost_idx;
1114
1115                 OBDO_ALLOC(req->rq_oi.oi_oa);
1116                 if (req->rq_oi.oi_oa == NULL) {
1117                         OBD_FREE(req, sizeof(*req));
1118                         GOTO(out_set, rc = -ENOMEM);
1119                 }
1120                 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
1121                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1122                 req->rq_oi.oi_oa->o_seq = loi->loi_seq;
1123                 lov_set_add_req(req, set);
1124         }
1125         if (!set->set_count)
1126                 GOTO(out_set, rc = -EIO);
1127         *reqset = set;
1128         RETURN(rc);
1129 out_set:
1130         lov_fini_destroy_set(set);
1131         RETURN(rc);
1132 }
1133
1134 int lov_fini_setattr_set(struct lov_request_set *set)
1135 {
1136         int rc = 0;
1137         ENTRY;
1138
1139         if (set == NULL)
1140                 RETURN(0);
1141         LASSERT(set->set_exp);
1142         if (set->set_completes) {
1143                 rc = common_attr_done(set);
1144                 /* FIXME update qos data here */
1145         }
1146
1147         lov_put_reqset(set);
1148         RETURN(rc);
1149 }
1150
1151 int lov_update_setattr_set(struct lov_request_set *set,
1152                            struct lov_request *req, int rc)
1153 {
1154         struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1155         struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1156         ENTRY;
1157
1158         lov_update_set(set, req, rc);
1159
1160         /* grace error on inactive ost */
1161         if (rc && !(lov->lov_tgts[req->rq_idx] &&
1162                     lov->lov_tgts[req->rq_idx]->ltd_active))
1163                 rc = 0;
1164
1165         if (rc == 0) {
1166                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCTIME)
1167                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_ctime =
1168                                 req->rq_oi.oi_oa->o_ctime;
1169                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLMTIME)
1170                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_mtime =
1171                                 req->rq_oi.oi_oa->o_mtime;
1172                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLATIME)
1173                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_atime =
1174                                 req->rq_oi.oi_oa->o_atime;
1175         }
1176
1177         RETURN(rc);
1178 }
1179
1180 /* The callback for osc_setattr_async that finilizes a request info when a
1181  * response is received. */
1182 static int cb_setattr_update(void *cookie, int rc)
1183 {
1184         struct obd_info *oinfo = cookie;
1185         struct lov_request *lovreq;
1186         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1187         return lov_update_setattr_set(lovreq->rq_rqset, lovreq, rc);
1188 }
1189
1190 int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo,
1191                          struct obd_trans_info *oti,
1192                          struct lov_request_set **reqset)
1193 {
1194         struct lov_request_set *set;
1195         struct lov_obd *lov = &exp->exp_obd->u.lov;
1196         int rc = 0, i;
1197         ENTRY;
1198
1199         OBD_ALLOC(set, sizeof(*set));
1200         if (set == NULL)
1201                 RETURN(-ENOMEM);
1202         lov_init_set(set);
1203
1204         set->set_exp = exp;
1205         set->set_oti = oti;
1206         set->set_oi = oinfo;
1207         if (oti != NULL && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
1208                 set->set_cookies = oti->oti_logcookies;
1209
1210         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1211                 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1212                 struct lov_request *req;
1213
1214                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1215                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1216                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1217                         continue;
1218                 }
1219
1220                 OBD_ALLOC(req, sizeof(*req));
1221                 if (req == NULL)
1222                         GOTO(out_set, rc = -ENOMEM);
1223                 req->rq_stripe = i;
1224                 req->rq_idx = loi->loi_ost_idx;
1225
1226                 OBDO_ALLOC(req->rq_oi.oi_oa);
1227                 if (req->rq_oi.oi_oa == NULL) {
1228                         OBD_FREE(req, sizeof(*req));
1229                         GOTO(out_set, rc = -ENOMEM);
1230                 }
1231                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1232                        sizeof(*req->rq_oi.oi_oa));
1233                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1234                 req->rq_oi.oi_oa->o_seq= loi->loi_seq;
1235                 req->rq_oi.oi_oa->o_stripe_idx = i;
1236                 req->rq_oi.oi_cb_up = cb_setattr_update;
1237                 req->rq_oi.oi_capa = oinfo->oi_capa;
1238
1239                 if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE) {
1240                         int off = lov_stripe_offset(oinfo->oi_md,
1241                                                     oinfo->oi_oa->o_size, i,
1242                                                     &req->rq_oi.oi_oa->o_size);
1243
1244                         if (off < 0 && req->rq_oi.oi_oa->o_size)
1245                                 req->rq_oi.oi_oa->o_size--;
1246
1247                         CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
1248                                i, req->rq_oi.oi_oa->o_size,
1249                                oinfo->oi_oa->o_size);
1250                 }
1251                 lov_set_add_req(req, set);
1252         }
1253         if (!set->set_count)
1254                 GOTO(out_set, rc = -EIO);
1255         *reqset = set;
1256         RETURN(rc);
1257 out_set:
1258         lov_fini_setattr_set(set);
1259         RETURN(rc);
1260 }
1261
1262 int lov_fini_punch_set(struct lov_request_set *set)
1263 {
1264         int rc = 0;
1265         ENTRY;
1266
1267         if (set == NULL)
1268                 RETURN(0);
1269         LASSERT(set->set_exp);
1270         if (set->set_completes) {
1271                 rc = -EIO;
1272                 /* FIXME update qos data here */
1273                 if (set->set_success)
1274                         rc = common_attr_done(set);
1275         }
1276
1277         lov_put_reqset(set);
1278
1279         RETURN(rc);
1280 }
1281
1282 int lov_update_punch_set(struct lov_request_set *set,
1283                          struct lov_request *req, int rc)
1284 {
1285         struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1286         struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1287         ENTRY;
1288
1289         lov_update_set(set, req, rc);
1290
1291         /* grace error on inactive ost */
1292         if (rc && !lov->lov_tgts[req->rq_idx]->ltd_active)
1293                 rc = 0;
1294
1295         if (rc == 0) {
1296                 lov_stripe_lock(lsm);
1297                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS) {
1298                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_blocks =
1299                                 req->rq_oi.oi_oa->o_blocks;
1300                 }
1301
1302                 /* Do we need to update lvb_size here? It needn't because
1303                  * it have been done in ll_truncate(). -jay */
1304                 lov_stripe_unlock(lsm);
1305         }
1306
1307         RETURN(rc);
1308 }
1309
1310 /* The callback for osc_punch that finilizes a request info when a response
1311  * is received. */
1312 static int cb_update_punch(void *cookie, int rc)
1313 {
1314         struct obd_info *oinfo = cookie;
1315         struct lov_request *lovreq;
1316         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1317         return lov_update_punch_set(lovreq->rq_rqset, lovreq, rc);
1318 }
1319
1320 int lov_prep_punch_set(struct obd_export *exp, struct obd_info *oinfo,
1321                        struct obd_trans_info *oti,
1322                        struct lov_request_set **reqset)
1323 {
1324         struct lov_request_set *set;
1325         struct lov_obd *lov = &exp->exp_obd->u.lov;
1326         int rc = 0, i;
1327         ENTRY;
1328
1329         OBD_ALLOC(set, sizeof(*set));
1330         if (set == NULL)
1331                 RETURN(-ENOMEM);
1332         lov_init_set(set);
1333
1334         set->set_oi = oinfo;
1335         set->set_exp = exp;
1336
1337         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1338                 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1339                 struct lov_request *req;
1340                 obd_off rs, re;
1341
1342                 if (!lov_stripe_intersects(oinfo->oi_md, i,
1343                                            oinfo->oi_policy.l_extent.start,
1344                                            oinfo->oi_policy.l_extent.end,
1345                                            &rs, &re))
1346                         continue;
1347
1348                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1349                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1350                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1351                         GOTO(out_set, rc = -EIO);
1352                 }
1353
1354                 OBD_ALLOC(req, sizeof(*req));
1355                 if (req == NULL)
1356                         GOTO(out_set, rc = -ENOMEM);
1357                 req->rq_stripe = i;
1358                 req->rq_idx = loi->loi_ost_idx;
1359
1360                 OBDO_ALLOC(req->rq_oi.oi_oa);
1361                 if (req->rq_oi.oi_oa == NULL) {
1362                         OBD_FREE(req, sizeof(*req));
1363                         GOTO(out_set, rc = -ENOMEM);
1364                 }
1365                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1366                        sizeof(*req->rq_oi.oi_oa));
1367                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1368                 req->rq_oi.oi_oa->o_seq = loi->loi_seq;
1369                 req->rq_oi.oi_oa->o_valid |= OBD_MD_FLGROUP;
1370
1371                 req->rq_oi.oi_oa->o_stripe_idx = i;
1372                 req->rq_oi.oi_cb_up = cb_update_punch;
1373
1374                 req->rq_oi.oi_policy.l_extent.start = rs;
1375                 req->rq_oi.oi_policy.l_extent.end = re;
1376                 req->rq_oi.oi_policy.l_extent.gid = -1;
1377
1378                 req->rq_oi.oi_capa = oinfo->oi_capa;
1379
1380                 lov_set_add_req(req, set);
1381         }
1382         if (!set->set_count)
1383                 GOTO(out_set, rc = -EIO);
1384         *reqset = set;
1385         RETURN(rc);
1386 out_set:
1387         lov_fini_punch_set(set);
1388         RETURN(rc);
1389 }
1390
1391 int lov_fini_sync_set(struct lov_request_set *set)
1392 {
1393         int rc = 0;
1394         ENTRY;
1395
1396         if (set == NULL)
1397                 RETURN(0);
1398         LASSERT(set->set_exp);
1399         if (set->set_completes) {
1400                 if (!set->set_success)
1401                         rc = -EIO;
1402                 /* FIXME update qos data here */
1403         }
1404
1405         lov_put_reqset(set);
1406
1407         RETURN(rc);
1408 }
1409
1410 int lov_prep_sync_set(struct obd_export *exp, struct obd_info *oinfo,
1411                       struct obdo *src_oa, struct lov_stripe_md *lsm,
1412                       obd_off start, obd_off end,
1413                       struct lov_request_set **reqset)
1414 {
1415         struct lov_request_set *set;
1416         struct lov_obd *lov = &exp->exp_obd->u.lov;
1417         int rc = 0, i;
1418         ENTRY;
1419
1420         OBD_ALLOC(set, sizeof(*set));
1421         if (set == NULL)
1422                 RETURN(-ENOMEM);
1423         lov_init_set(set);
1424
1425         set->set_exp = exp;
1426         set->set_oi = oinfo;
1427         set->set_oi->oi_md = lsm;
1428         set->set_oi->oi_oa = src_oa;
1429
1430         for (i = 0; i < lsm->lsm_stripe_count; i++) {
1431                 struct lov_oinfo *loi = lsm->lsm_oinfo[i];
1432                 struct lov_request *req;
1433                 obd_off rs, re;
1434
1435                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1436                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1437                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1438                         continue;
1439                 }
1440
1441                 if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
1442                         continue;
1443
1444                 OBD_ALLOC(req, sizeof(*req));
1445                 if (req == NULL)
1446                         GOTO(out_set, rc = -ENOMEM);
1447                 req->rq_stripe = i;
1448                 req->rq_idx = loi->loi_ost_idx;
1449
1450                 OBDO_ALLOC(req->rq_oi.oi_oa);
1451                 if (req->rq_oi.oi_oa == NULL) {
1452                         OBD_FREE(req, sizeof(*req));
1453                         GOTO(out_set, rc = -ENOMEM);
1454                 }
1455                 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
1456                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1457                 req->rq_oi.oi_oa->o_seq = loi->loi_seq;
1458                 req->rq_oi.oi_oa->o_stripe_idx = i;
1459
1460                 req->rq_oi.oi_policy.l_extent.start = rs;
1461                 req->rq_oi.oi_policy.l_extent.end = re;
1462                 req->rq_oi.oi_policy.l_extent.gid = -1;
1463
1464                 lov_set_add_req(req, set);
1465         }
1466         if (!set->set_count)
1467                 GOTO(out_set, rc = -EIO);
1468         *reqset = set;
1469         RETURN(rc);
1470 out_set:
1471         lov_fini_sync_set(set);
1472         RETURN(rc);
1473 }
1474
1475 #define LOV_U64_MAX ((__u64)~0ULL)
1476 #define LOV_SUM_MAX(tot, add)                                           \
1477         do {                                                            \
1478                 if ((tot) + (add) < (tot))                              \
1479                         (tot) = LOV_U64_MAX;                            \
1480                 else                                                    \
1481                         (tot) += (add);                                 \
1482         } while(0)
1483
1484 int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs,int success)
1485 {
1486         ENTRY;
1487
1488         if (success) {
1489                 __u32 expected_stripes = lov_get_stripecnt(&obd->u.lov, 0);
1490
1491                 if (osfs->os_files != LOV_U64_MAX)
1492                         do_div(osfs->os_files, expected_stripes);
1493                 if (osfs->os_ffree != LOV_U64_MAX)
1494                         do_div(osfs->os_ffree, expected_stripes);
1495
1496                 cfs_spin_lock(&obd->obd_osfs_lock);
1497                 memcpy(&obd->obd_osfs, osfs, sizeof(*osfs));
1498                 obd->obd_osfs_age = cfs_time_current_64();
1499                 cfs_spin_unlock(&obd->obd_osfs_lock);
1500                 RETURN(0);
1501         }
1502
1503         RETURN(-EIO);
1504 }
1505
1506 int lov_fini_statfs_set(struct lov_request_set *set)
1507 {
1508         int rc = 0;
1509         ENTRY;
1510
1511         if (set == NULL)
1512                 RETURN(0);
1513
1514         if (set->set_completes) {
1515                 rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs,
1516                                      set->set_success);
1517         }
1518         lov_put_reqset(set);
1519         RETURN(rc);
1520 }
1521
1522 void lov_update_statfs(struct obd_statfs *osfs, struct obd_statfs *lov_sfs,
1523                        int success)
1524 {
1525         int shift = 0, quit = 0;
1526         __u64 tmp;
1527
1528         if (success == 0) {
1529                 memcpy(osfs, lov_sfs, sizeof(*lov_sfs));
1530         } else {
1531                 if (osfs->os_bsize != lov_sfs->os_bsize) {
1532                         /* assume all block sizes are always powers of 2 */
1533                         /* get the bits difference */
1534                         tmp = osfs->os_bsize | lov_sfs->os_bsize;
1535                         for (shift = 0; shift <= 64; ++shift) {
1536                                 if (tmp & 1) {
1537                                         if (quit)
1538                                                 break;
1539                                         else
1540                                                 quit = 1;
1541                                         shift = 0;
1542                                 }
1543                                 tmp >>= 1;
1544                         }
1545                 }
1546
1547                 if (osfs->os_bsize < lov_sfs->os_bsize) {
1548                         osfs->os_bsize = lov_sfs->os_bsize;
1549
1550                         osfs->os_bfree  >>= shift;
1551                         osfs->os_bavail >>= shift;
1552                         osfs->os_blocks >>= shift;
1553                 } else if (shift != 0) {
1554                         lov_sfs->os_bfree  >>= shift;
1555                         lov_sfs->os_bavail >>= shift;
1556                         lov_sfs->os_blocks >>= shift;
1557                 }
1558 #ifdef MIN_DF
1559                 /* Sandia requested that df (and so, statfs) only
1560                    returned minimal available space on
1561                    a single OST, so people would be able to
1562                    write this much data guaranteed. */
1563                 if (osfs->os_bavail > lov_sfs->os_bavail) {
1564                         /* Presumably if new bavail is smaller,
1565                            new bfree is bigger as well */
1566                         osfs->os_bfree = lov_sfs->os_bfree;
1567                         osfs->os_bavail = lov_sfs->os_bavail;
1568                 }
1569 #else
1570                 osfs->os_bfree += lov_sfs->os_bfree;
1571                 osfs->os_bavail += lov_sfs->os_bavail;
1572 #endif
1573                 osfs->os_blocks += lov_sfs->os_blocks;
1574                 /* XXX not sure about this one - depends on policy.
1575                  *   - could be minimum if we always stripe on all OBDs
1576                  *     (but that would be wrong for any other policy,
1577                  *     if one of the OBDs has no more objects left)
1578                  *   - could be sum if we stripe whole objects
1579                  *   - could be average, just to give a nice number
1580                  *
1581                  * To give a "reasonable" (if not wholly accurate)
1582                  * number, we divide the total number of free objects
1583                  * by expected stripe count (watch out for overflow).
1584                  */
1585                 LOV_SUM_MAX(osfs->os_files, lov_sfs->os_files);
1586                 LOV_SUM_MAX(osfs->os_ffree, lov_sfs->os_ffree);
1587         }
1588 }
1589
1590 /* The callback for osc_statfs_async that finilizes a request info when a
1591  * response is received. */
1592 static int cb_statfs_update(void *cookie, int rc)
1593 {
1594         struct obd_info *oinfo = cookie;
1595         struct lov_request *lovreq;
1596         struct obd_statfs *osfs, *lov_sfs;
1597         struct lov_obd *lov;
1598         struct lov_tgt_desc *tgt;
1599         struct obd_device *lovobd, *tgtobd;
1600         int success;
1601         ENTRY;
1602
1603         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1604         lovobd = lovreq->rq_rqset->set_obd;
1605         lov = &lovobd->u.lov;
1606         osfs = lovreq->rq_rqset->set_oi->oi_osfs;
1607         lov_sfs = oinfo->oi_osfs;
1608         success = lovreq->rq_rqset->set_success;
1609         /* XXX: the same is done in lov_update_common_set, however
1610            lovset->set_exp is not initialized. */
1611         lov_update_set(lovreq->rq_rqset, lovreq, rc);
1612         if (rc)
1613                 GOTO(out, rc);
1614
1615         obd_getref(lovobd);
1616         tgt = lov->lov_tgts[lovreq->rq_idx];
1617         if (!tgt || !tgt->ltd_active)
1618                 GOTO(out_update, rc);
1619
1620         tgtobd = class_exp2obd(tgt->ltd_exp);
1621         cfs_spin_lock(&tgtobd->obd_osfs_lock);
1622         memcpy(&tgtobd->obd_osfs, lov_sfs, sizeof(*lov_sfs));
1623         if ((oinfo->oi_flags & OBD_STATFS_FROM_CACHE) == 0)
1624                 tgtobd->obd_osfs_age = cfs_time_current_64();
1625         cfs_spin_unlock(&tgtobd->obd_osfs_lock);
1626
1627 out_update:
1628         lov_update_statfs(osfs, lov_sfs, success);
1629         qos_update(lov);
1630         obd_putref(lovobd);
1631
1632 out:
1633         if (lovreq->rq_rqset->set_oi->oi_flags & OBD_STATFS_PTLRPCD &&
1634             lov_finished_set(lovreq->rq_rqset)) {
1635                lov_statfs_interpret(NULL, lovreq->rq_rqset,
1636                                     lovreq->rq_rqset->set_success !=
1637                                                   lovreq->rq_rqset->set_count);
1638                qos_statfs_done(lov);
1639         }
1640
1641         RETURN(0);
1642 }
1643
1644 int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo,
1645                         struct lov_request_set **reqset)
1646 {
1647         struct lov_request_set *set;
1648         struct lov_obd *lov = &obd->u.lov;
1649         int rc = 0, i;
1650         ENTRY;
1651
1652         OBD_ALLOC(set, sizeof(*set));
1653         if (set == NULL)
1654                 RETURN(-ENOMEM);
1655         lov_init_set(set);
1656
1657         set->set_obd = obd;
1658         set->set_oi = oinfo;
1659
1660         /* We only get block data from the OBD */
1661         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1662                 struct lov_request *req;
1663
1664                 if (!lov->lov_tgts[i] || (!lov->lov_tgts[i]->ltd_active
1665                                           && (oinfo->oi_flags & OBD_STATFS_NODELAY))) {
1666                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
1667                         continue;
1668                 }
1669
1670                 /* skip targets that have been explicitely disabled by the
1671                  * administrator */
1672                 if (!lov->lov_tgts[i]->ltd_exp) {
1673                         CDEBUG(D_HA, "lov idx %d administratively disabled\n", i);
1674                         continue;
1675                 }
1676
1677                 OBD_ALLOC(req, sizeof(*req));
1678                 if (req == NULL)
1679                         GOTO(out_set, rc = -ENOMEM);
1680
1681                 OBD_ALLOC(req->rq_oi.oi_osfs, sizeof(*req->rq_oi.oi_osfs));
1682                 if (req->rq_oi.oi_osfs == NULL) {
1683                         OBD_FREE(req, sizeof(*req));
1684                         GOTO(out_set, rc = -ENOMEM);
1685                 }
1686
1687                 req->rq_idx = i;
1688                 req->rq_oi.oi_cb_up = cb_statfs_update;
1689                 req->rq_oi.oi_flags = oinfo->oi_flags;
1690
1691                 lov_set_add_req(req, set);
1692         }
1693         if (!set->set_count)
1694                 GOTO(out_set, rc = -EIO);
1695         *reqset = set;
1696         RETURN(rc);
1697 out_set:
1698         lov_fini_statfs_set(set);
1699         RETURN(rc);
1700 }