Whamcloud - gitweb
b702f7ff52c68e5df7fbfa0432ff029ced3565b8
[fs/lustre-release.git] / lustre / lov / lov_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of the Lustre file system, http://www.lustre.org
7  *   Lustre is a trademark of Cluster File Systems, Inc.
8  *
9  *   You may have signed or agreed to another license before downloading
10  *   this software.  If so, you are bound by the terms and conditions
11  *   of that agreement, and the following does not apply to you.  See the
12  *   LICENSE file included with this distribution for more information.
13  *
14  *   If you did not agree to a different license, then this copy of Lustre
15  *   is open source software; you can redistribute it and/or modify it
16  *   under the terms of version 2 of the GNU General Public License as
17  *   published by the Free Software Foundation.
18  *
19  *   In either case, Lustre is distributed in the hope that it will be
20  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
21  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22  *   license text for more details.
23  */
24
25 #ifndef EXPORT_SYMTAB
26 # define EXPORT_SYMTAB
27 #endif
28 #define DEBUG_SUBSYSTEM S_LOV
29
30 #ifdef __KERNEL__
31 #include <libcfs/libcfs.h>
32 #else
33 #include <liblustre.h>
34 #endif
35
36 #include <obd_class.h>
37 #include <obd_lov.h>
38 #include <lustre/lustre_idl.h>
39
40 #include "lov_internal.h"
41
42 static void lov_init_set(struct lov_request_set *set)
43 {
44         set->set_count = 0;
45         set->set_completes = 0;
46         set->set_success = 0;
47         set->set_cookies = 0;
48         CFS_INIT_LIST_HEAD(&set->set_list);
49         atomic_set(&set->set_refcount, 1);
50 }
51
52 static void lov_finish_set(struct lov_request_set *set)
53 {
54         struct list_head *pos, *n;
55         ENTRY;
56
57         LASSERT(set);
58         list_for_each_safe(pos, n, &set->set_list) {
59                 struct lov_request *req = list_entry(pos, struct lov_request,
60                                                      rq_link);
61                 list_del_init(&req->rq_link);
62
63                 if (req->rq_oi.oi_oa)
64                         OBDO_FREE(req->rq_oi.oi_oa);
65                 if (req->rq_oi.oi_md)
66                         OBD_FREE(req->rq_oi.oi_md, req->rq_buflen);
67                 if (req->rq_oi.oi_osfs)
68                         OBD_FREE(req->rq_oi.oi_osfs,
69                                  sizeof(*req->rq_oi.oi_osfs));
70                 OBD_FREE(req, sizeof(*req));
71         }
72
73         if (set->set_pga) {
74                 int len = set->set_oabufs * sizeof(*set->set_pga);
75                 OBD_FREE(set->set_pga, len);
76         }
77         if (set->set_lockh)
78                 lov_llh_put(set->set_lockh);
79
80         OBD_FREE(set, sizeof(*set));
81         EXIT;
82 }
83
84 void lov_update_set(struct lov_request_set *set,
85                     struct lov_request *req, int rc)
86 {
87         req->rq_complete = 1;
88         req->rq_rc = rc;
89
90         set->set_completes++;
91         if (rc == 0)
92                 set->set_success++;
93 }
94
95 int lov_update_common_set(struct lov_request_set *set,
96                           struct lov_request *req, int rc)
97 {
98         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
99         ENTRY;
100
101         lov_update_set(set, req, rc);
102
103         /* grace error on inactive ost */
104         if (rc && !(lov->lov_tgts[req->rq_idx] && 
105                     lov->lov_tgts[req->rq_idx]->ltd_active))
106                 rc = 0;
107
108         /* FIXME in raid1 regime, should return 0 */
109         RETURN(rc);
110 }
111
112 void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
113 {
114         list_add_tail(&req->rq_link, &set->set_list);
115         set->set_count++;
116 }
117
118 int lov_update_enqueue_set(struct lov_request *req, __u32 mode, int rc)
119 {
120         struct lov_request_set *set = req->rq_rqset;
121         struct lustre_handle *lov_lockhp;
122         struct lov_oinfo *loi;
123         ENTRY;
124
125         LASSERT(set != NULL);
126         LASSERT(set->set_oi != NULL);
127
128         lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
129         loi = set->set_oi->oi_md->lsm_oinfo[req->rq_stripe];
130
131         /* XXX LOV STACKING: OSC gets a copy, created in lov_prep_enqueue_set
132          * and that copy can be arbitrarily out of date.
133          *
134          * The LOV API is due for a serious rewriting anyways, and this
135          * can be addressed then. */
136
137         if (rc == ELDLM_OK) {
138                 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
139                 __u64 tmp;
140
141                 LASSERT(lock != NULL);
142                 lov_stripe_lock(set->set_oi->oi_md);
143                 loi->loi_lvb = req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb;
144                 tmp = loi->loi_lvb.lvb_size;
145                 /* Extend KMS up to the end of this lock and no further
146                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
147                 if (tmp > lock->l_policy_data.l_extent.end)
148                         tmp = lock->l_policy_data.l_extent.end + 1;
149                 if (tmp >= loi->loi_kms) {
150                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
151                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
152                         loi->loi_kms = tmp;
153                         loi->loi_kms_valid = 1;
154                 } else {
155                         LDLM_DEBUG(lock, "lock acquired, setting rss="
156                                    LPU64"; leaving kms="LPU64", end="LPU64,
157                                    loi->loi_lvb.lvb_size, loi->loi_kms,
158                                    lock->l_policy_data.l_extent.end);
159                 }
160                 lov_stripe_unlock(set->set_oi->oi_md);
161                 ldlm_lock_allow_match(lock);
162                 LDLM_LOCK_PUT(lock);
163         } else if ((rc == ELDLM_LOCK_ABORTED) &&
164                    (set->set_oi->oi_flags & LDLM_FL_HAS_INTENT)) {
165                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
166                 lov_stripe_lock(set->set_oi->oi_md);
167                 loi->loi_lvb = req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb;
168                 lov_stripe_unlock(set->set_oi->oi_md);
169                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
170                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
171                 rc = ELDLM_OK;
172         } else {
173                 struct obd_export *exp = set->set_exp;
174                 struct lov_obd *lov = &exp->exp_obd->u.lov;
175
176                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
177                 if (lov->lov_tgts[req->rq_idx] && 
178                     lov->lov_tgts[req->rq_idx]->ltd_active) {
179                         if (rc != -EINTR)
180                                 CERROR("enqueue objid "LPX64" subobj "
181                                        LPX64" on OST idx %d: rc %d\n",
182                                        set->set_oi->oi_md->lsm_object_id,
183                                        loi->loi_id, loi->loi_ost_idx, rc);
184                 } else {
185                         rc = ELDLM_OK;
186                 }
187         }
188         lov_update_set(set, req, rc);
189         RETURN(rc);
190 }
191
192 /* The callback for osc_enqueue that updates lov info for every OSC request. */
193 static int cb_update_enqueue(struct obd_info *oinfo, int rc)
194 {
195         struct ldlm_enqueue_info *einfo;
196         struct lov_request *lovreq;
197
198         lovreq = container_of(oinfo, struct lov_request, rq_oi);
199         einfo = lovreq->rq_rqset->set_ei;
200         return lov_update_enqueue_set(lovreq, einfo->ei_mode, rc);
201 }
202
203 static int enqueue_done(struct lov_request_set *set, __u32 mode)
204 {
205         struct lov_request *req;
206         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
207         int rc = 0;
208         ENTRY;
209
210         /* enqueue/match success, just return */
211         if (set->set_completes && set->set_completes == set->set_success)
212                 RETURN(0);
213
214         /* cancel enqueued/matched locks */
215         list_for_each_entry(req, &set->set_list, rq_link) {
216                 struct lustre_handle *lov_lockhp;
217
218                 if (!req->rq_complete || req->rq_rc)
219                         continue;
220
221                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
222                 LASSERT(lov_lockhp);
223                 if (!lustre_handle_is_used(lov_lockhp))
224                         continue;
225
226                 rc = obd_cancel(lov->lov_tgts[req->rq_idx]->ltd_exp,
227                                 req->rq_oi.oi_md, mode, lov_lockhp);
228                 if (rc && lov->lov_tgts[req->rq_idx] &&
229                     lov->lov_tgts[req->rq_idx]->ltd_active)
230                         CERROR("cancelling obdjid "LPX64" on OST "
231                                "idx %d error: rc = %d\n",
232                                req->rq_oi.oi_md->lsm_object_id,
233                                req->rq_idx, rc);
234         }
235         if (set->set_lockh)
236                 lov_llh_put(set->set_lockh);
237         RETURN(rc);
238 }
239
240 int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc,
241                          struct ptlrpc_request_set *rqset)
242 {
243         int ret = 0;
244         ENTRY;
245
246         if (set == NULL)
247                 RETURN(0);
248         LASSERT(set->set_exp);
249         /* Do enqueue_done only for sync requests and if any request
250          * succeeded. */
251         if (!rqset) {
252                 if (rc)
253                         set->set_completes = 0;
254                 ret = enqueue_done(set, mode);
255         } else if (set->set_lockh)
256                 lov_llh_put(set->set_lockh);
257
258         if (atomic_dec_and_test(&set->set_refcount))
259                 lov_finish_set(set);
260
261         RETURN(rc ? rc : ret);
262 }
263
264 int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo,
265                          struct ldlm_enqueue_info *einfo,
266                          struct lov_request_set **reqset)
267 {
268         struct lov_obd *lov = &exp->exp_obd->u.lov;
269         struct lov_request_set *set;
270         int i, rc = 0;
271         ENTRY;
272
273         OBD_ALLOC(set, sizeof(*set));
274         if (set == NULL)
275                 RETURN(-ENOMEM);
276         lov_init_set(set);
277
278         set->set_exp = exp;
279         set->set_oi = oinfo;
280         set->set_ei = einfo;
281         set->set_lockh = lov_llh_new(oinfo->oi_md);
282         if (set->set_lockh == NULL)
283                 GOTO(out_set, rc = -ENOMEM);
284         oinfo->oi_lockh->cookie = set->set_lockh->llh_handle.h_cookie;
285
286         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
287                 struct lov_oinfo *loi;
288                 struct lov_request *req;
289                 obd_off start, end;
290
291                 loi = oinfo->oi_md->lsm_oinfo[i];
292                 if (!lov_stripe_intersects(oinfo->oi_md, i,
293                                            oinfo->oi_policy.l_extent.start,
294                                            oinfo->oi_policy.l_extent.end,
295                                            &start, &end))
296                         continue;
297
298                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
299                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
300                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
301                         continue;
302                 }
303
304                 OBD_ALLOC(req, sizeof(*req));
305                 if (req == NULL)
306                         GOTO(out_set, rc = -ENOMEM);
307
308                 req->rq_buflen = sizeof(*req->rq_oi.oi_md) +
309                         sizeof(struct lov_oinfo *) +
310                         sizeof(struct lov_oinfo);
311                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
312                 if (req->rq_oi.oi_md == NULL) {
313                         OBD_FREE(req, sizeof(*req));
314                         GOTO(out_set, rc = -ENOMEM);
315                 }
316                 req->rq_oi.oi_md->lsm_oinfo[0] =
317                         ((void *)req->rq_oi.oi_md) + sizeof(*req->rq_oi.oi_md) +
318                         sizeof(struct lov_oinfo *);
319
320
321                 req->rq_rqset = set;
322                 /* Set lov request specific parameters. */
323                 req->rq_oi.oi_lockh = set->set_lockh->llh_handles + i;
324                 req->rq_oi.oi_cb_up = cb_update_enqueue;
325                 req->rq_oi.oi_flags = oinfo->oi_flags;
326
327                 LASSERT(req->rq_oi.oi_lockh);
328
329                 req->rq_oi.oi_policy.l_extent.gid =
330                         oinfo->oi_policy.l_extent.gid;
331                 req->rq_oi.oi_policy.l_extent.start = start;
332                 req->rq_oi.oi_policy.l_extent.end = end;
333
334                 req->rq_idx = loi->loi_ost_idx;
335                 req->rq_stripe = i;
336
337                 /* XXX LOV STACKING: submd should be from the subobj */
338                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
339                 req->rq_oi.oi_md->lsm_object_gr = oinfo->oi_md->lsm_object_gr;
340                 req->rq_oi.oi_md->lsm_stripe_count = 0;
341                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms_valid =
342                         loi->loi_kms_valid;
343                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms = loi->loi_kms;
344                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb = loi->loi_lvb;
345
346                 lov_set_add_req(req, set);
347         }
348         if (!set->set_count)
349                 GOTO(out_set, rc = -EIO);
350         *reqset = set;
351         RETURN(0);
352 out_set:
353         lov_fini_enqueue_set(set, einfo->ei_mode, rc, NULL);
354         RETURN(rc);
355 }
356
357 int lov_update_match_set(struct lov_request_set *set, struct lov_request *req,
358                          int rc)
359 {
360         int ret = rc;
361         ENTRY;
362
363         if (rc == 1)
364                 ret = 0;
365         else if (rc == 0)
366                 ret = 1;
367         lov_update_set(set, req, ret);
368         RETURN(rc);
369 }
370
371 int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
372 {
373         int rc = 0;
374         ENTRY;
375
376         if (set == NULL)
377                 RETURN(0);
378         LASSERT(set->set_exp);
379         rc = enqueue_done(set, mode);
380         if ((set->set_count == set->set_success) &&
381             (flags & LDLM_FL_TEST_LOCK))
382                 lov_llh_put(set->set_lockh);
383
384         if (atomic_dec_and_test(&set->set_refcount))
385                 lov_finish_set(set);
386
387         RETURN(rc);
388 }
389
390 int lov_prep_match_set(struct obd_export *exp, struct obd_info *oinfo,
391                        struct lov_stripe_md *lsm, ldlm_policy_data_t *policy,
392                        __u32 mode, struct lustre_handle *lockh,
393                        struct lov_request_set **reqset)
394 {
395         struct lov_obd *lov = &exp->exp_obd->u.lov;
396         struct lov_request_set *set;
397         int i, rc = 0;
398         ENTRY;
399
400         OBD_ALLOC(set, sizeof(*set));
401         if (set == NULL)
402                 RETURN(-ENOMEM);
403         lov_init_set(set);
404
405         set->set_exp = exp;
406         set->set_oi = oinfo;
407         set->set_oi->oi_md = lsm;
408         set->set_lockh = lov_llh_new(lsm);
409         if (set->set_lockh == NULL)
410                 GOTO(out_set, rc = -ENOMEM);
411         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
412
413         for (i = 0; i < lsm->lsm_stripe_count; i++){
414                 struct lov_oinfo *loi;
415                 struct lov_request *req;
416                 obd_off start, end;
417
418                 loi = lsm->lsm_oinfo[i];
419                 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
420                                            policy->l_extent.end, &start, &end))
421                         continue;
422
423                 /* FIXME raid1 should grace this error */
424                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
425                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
426                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
427                         GOTO(out_set, rc = -EIO);
428                 }
429
430                 OBD_ALLOC(req, sizeof(*req));
431                 if (req == NULL)
432                         GOTO(out_set, rc = -ENOMEM);
433
434                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
435                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
436                 if (req->rq_oi.oi_md == NULL) {
437                         OBD_FREE(req, sizeof(*req));
438                         GOTO(out_set, rc = -ENOMEM);
439                 }
440
441                 req->rq_oi.oi_policy.l_extent.start = start;
442                 req->rq_oi.oi_policy.l_extent.end = end;
443                 req->rq_oi.oi_policy.l_extent.gid = policy->l_extent.gid;
444
445                 req->rq_idx = loi->loi_ost_idx;
446                 req->rq_stripe = i;
447
448                 /* XXX LOV STACKING: submd should be from the subobj */
449                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
450                 req->rq_oi.oi_md->lsm_object_gr = lsm->lsm_object_gr;
451                 req->rq_oi.oi_md->lsm_stripe_count = 0;
452
453                 lov_set_add_req(req, set);
454         }
455         if (!set->set_count)
456                 GOTO(out_set, rc = -EIO);
457         *reqset = set;
458         RETURN(rc);
459 out_set:
460         lov_fini_match_set(set, mode, 0);
461         RETURN(rc);
462 }
463
464 int lov_fini_cancel_set(struct lov_request_set *set)
465 {
466         int rc = 0;
467         ENTRY;
468
469         if (set == NULL)
470                 RETURN(0);
471
472         LASSERT(set->set_exp);
473         if (set->set_lockh)
474                 lov_llh_put(set->set_lockh);
475
476         if (atomic_dec_and_test(&set->set_refcount))
477                 lov_finish_set(set);
478
479         RETURN(rc);
480 }
481
482 int lov_prep_cancel_set(struct obd_export *exp, struct obd_info *oinfo,
483                         struct lov_stripe_md *lsm, __u32 mode,
484                         struct lustre_handle *lockh,
485                         struct lov_request_set **reqset)
486 {
487         struct lov_request_set *set;
488         int i, rc = 0;
489         ENTRY;
490
491         OBD_ALLOC(set, sizeof(*set));
492         if (set == NULL)
493                 RETURN(-ENOMEM);
494         lov_init_set(set);
495
496         set->set_exp = exp;
497         set->set_oi = oinfo;
498         set->set_oi->oi_md = lsm;
499         set->set_lockh = lov_handle2llh(lockh);
500         if (set->set_lockh == NULL) {
501                 CERROR("LOV: invalid lov lock handle %p\n", lockh);
502                 GOTO(out_set, rc = -EINVAL);
503         }
504         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
505
506         for (i = 0; i < lsm->lsm_stripe_count; i++){
507                 struct lov_request *req;
508                 struct lustre_handle *lov_lockhp;
509                 struct lov_oinfo *loi = lsm->lsm_oinfo[i];
510
511                 lov_lockhp = set->set_lockh->llh_handles + i;
512                 if (!lustre_handle_is_used(lov_lockhp)) {
513                         CDEBUG(D_HA, "lov idx %d subobj "LPX64" no lock?\n",
514                                loi->loi_ost_idx, loi->loi_id);
515                         continue;
516                 }
517
518                 OBD_ALLOC(req, sizeof(*req));
519                 if (req == NULL)
520                         GOTO(out_set, rc = -ENOMEM);
521
522                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
523                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
524                 if (req->rq_oi.oi_md == NULL) {
525                         OBD_FREE(req, sizeof(*req));
526                         GOTO(out_set, rc = -ENOMEM);
527                 }
528
529                 req->rq_idx = loi->loi_ost_idx;
530                 req->rq_stripe = i;
531
532                 /* XXX LOV STACKING: submd should be from the subobj */
533                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
534                 req->rq_oi.oi_md->lsm_object_gr = lsm->lsm_object_gr;
535                 req->rq_oi.oi_md->lsm_stripe_count = 0;
536
537                 lov_set_add_req(req, set);
538         }
539         if (!set->set_count)
540                 GOTO(out_set, rc = -EIO);
541         *reqset = set;
542         RETURN(rc);
543 out_set:
544         lov_fini_cancel_set(set);
545         RETURN(rc);
546 }
547
548 static int create_done(struct obd_export *exp, struct lov_request_set *set,
549                        struct lov_stripe_md **lsmp)
550 {
551         struct lov_obd *lov = &exp->exp_obd->u.lov;
552         struct obd_trans_info *oti = set->set_oti;
553         struct obdo *src_oa = set->set_oi->oi_oa;
554         struct lov_request *req;
555         struct obdo *ret_oa = NULL;
556         int attrset = 0, rc = 0;
557         ENTRY;
558
559         LASSERT(set->set_completes);
560
561         /* try alloc objects on other osts if osc_create fails for
562          * exceptions: RPC failure, ENOSPC, etc */
563         if (set->set_count != set->set_success) {
564                 list_for_each_entry (req, &set->set_list, rq_link) {
565                         if (req->rq_rc == 0)
566                                 continue;
567
568                         set->set_completes--;
569                         req->rq_complete = 0;
570
571                         rc = qos_remedy_create(set, req);
572                         lov_update_create_set(set, req, rc);
573
574                         if (rc)
575                                 break;
576                 }
577         }
578
579         /* no successful creates */
580         if (set->set_success == 0)
581                 GOTO(cleanup, rc);
582
583         /* If there was an explicit stripe set, fail.  Otherwise, we
584          * got some objects and that's not bad. */
585         if (set->set_count != set->set_success) {
586                 if (*lsmp)
587                         GOTO(cleanup, rc);
588                 set->set_count = set->set_success;
589                 qos_shrink_lsm(set);
590         }
591
592         OBDO_ALLOC(ret_oa);
593         if (ret_oa == NULL)
594                 GOTO(cleanup, rc = -ENOMEM);
595
596         list_for_each_entry(req, &set->set_list, rq_link) {
597                 if (!req->rq_complete || req->rq_rc)
598                         continue;
599                 lov_merge_attrs(ret_oa, req->rq_oi.oi_oa,
600                                 req->rq_oi.oi_oa->o_valid, set->set_oi->oi_md,
601                                 req->rq_stripe, &attrset);
602         }
603         if (src_oa->o_valid & OBD_MD_FLSIZE &&
604             ret_oa->o_size != src_oa->o_size) {
605                 CERROR("original size "LPU64" isn't new object size "LPU64"\n",
606                        src_oa->o_size, ret_oa->o_size);
607                 LBUG();
608         }
609         ret_oa->o_id = src_oa->o_id;
610         ret_oa->o_gr = src_oa->o_gr;
611         ret_oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP;
612         memcpy(src_oa, ret_oa, sizeof(*src_oa));
613         OBDO_FREE(ret_oa);
614
615         *lsmp = set->set_oi->oi_md;
616         GOTO(done, rc = 0);
617
618 cleanup:
619         list_for_each_entry(req, &set->set_list, rq_link) {
620                 struct obd_export *sub_exp;
621                 int err = 0;
622
623                 if (!req->rq_complete || req->rq_rc)
624                         continue;
625
626                 sub_exp = lov->lov_tgts[req->rq_idx]->ltd_exp;
627                 err = obd_destroy(sub_exp, req->rq_oi.oi_oa, NULL, oti, NULL);
628                 if (err)
629                         CERROR("Failed to uncreate objid "LPX64" subobj "
630                                LPX64" on OST idx %d: rc = %d\n",
631                                src_oa->o_id, req->rq_oi.oi_oa->o_id,
632                                req->rq_idx, rc);
633         }
634         if (*lsmp == NULL)
635                 obd_free_memmd(exp, &set->set_oi->oi_md);
636 done:
637         if (oti && set->set_cookies) {
638                 oti->oti_logcookies = set->set_cookies;
639                 if (!set->set_cookie_sent) {
640                         oti_free_cookies(oti);
641                         src_oa->o_valid &= ~OBD_MD_FLCOOKIE;
642                 } else {
643                         src_oa->o_valid |= OBD_MD_FLCOOKIE;
644                 }
645         }
646         RETURN(rc);
647 }
648
649 int lov_fini_create_set(struct lov_request_set *set,struct lov_stripe_md **lsmp)
650 {
651         int rc = 0;
652         ENTRY;
653
654         if (set == NULL)
655                 RETURN(0);
656         LASSERT(set->set_exp);
657         if (set->set_completes)
658                 rc = create_done(set->set_exp, set, lsmp);
659
660         if (atomic_dec_and_test(&set->set_refcount))
661                 lov_finish_set(set);
662
663         RETURN(rc);
664 }
665
666 int lov_update_create_set(struct lov_request_set *set,
667                           struct lov_request *req, int rc)
668 {
669         struct obd_trans_info *oti = set->set_oti;
670         struct lov_stripe_md *lsm = set->set_oi->oi_md;
671         struct lov_oinfo *loi;
672         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
673         ENTRY;
674
675         req->rq_stripe = set->set_success;
676         loi = lsm->lsm_oinfo[req->rq_stripe];
677
678         if (rc && lov->lov_tgts[req->rq_idx] &&
679             lov->lov_tgts[req->rq_idx]->ltd_active) {
680                 CERROR("error creating fid "LPX64" sub-object"
681                        " on OST idx %d/%d: rc = %d\n",
682                        set->set_oi->oi_oa->o_id, req->rq_idx,
683                        lsm->lsm_stripe_count, rc);
684                 if (rc > 0) {
685                         CERROR("obd_create returned invalid err %d\n", rc);
686                         rc = -EIO;
687                 }
688         }
689         lov_update_set(set, req, rc);
690         if (rc)
691                 RETURN(rc);
692
693         if (oti && oti->oti_objid)
694                 oti->oti_objid[req->rq_idx] = req->rq_oi.oi_oa->o_id;
695
696         loi->loi_id = req->rq_oi.oi_oa->o_id;
697         loi->loi_gr = req->rq_oi.oi_oa->o_gr;
698         loi->loi_ost_idx = req->rq_idx;
699         CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPU64" at idx %d\n",
700                lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);
701         loi_init(loi);
702
703         if (oti && set->set_cookies)
704                 ++oti->oti_logcookies;
705         if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCOOKIE)
706                 set->set_cookie_sent++;
707
708         RETURN(0);
709 }
710
711 int lov_prep_create_set(struct obd_export *exp, struct obd_info *oinfo,
712                         struct lov_stripe_md **lsmp, struct obdo *src_oa,
713                         struct obd_trans_info *oti,
714                         struct lov_request_set **reqset)
715 {
716         struct lov_request_set *set;
717         int rc = 0;
718         ENTRY;
719
720         OBD_ALLOC(set, sizeof(*set));
721         if (set == NULL)
722                 RETURN(-ENOMEM);
723         lov_init_set(set);
724
725         set->set_exp = exp;
726         set->set_oi = oinfo;
727         set->set_oi->oi_md = *lsmp;
728         set->set_oi->oi_oa = src_oa;
729         set->set_oti = oti;
730
731         rc = qos_prep_create(exp, set);
732         if (rc)
733                 lov_fini_create_set(set, lsmp);
734         else
735                 *reqset = set;
736         RETURN(rc);
737 }
738
739 static int common_attr_done(struct lov_request_set *set)
740 {
741         struct list_head *pos;
742         struct lov_request *req;
743         struct obdo *tmp_oa;
744         int rc = 0, attrset = 0;
745         ENTRY;
746
747         LASSERT(set->set_oi != NULL);
748
749         if (set->set_oi->oi_oa == NULL)
750                 RETURN(0);
751
752         if (!set->set_success)
753                 RETURN(-EIO);
754
755         OBDO_ALLOC(tmp_oa);
756         if (tmp_oa == NULL)
757                 GOTO(out, rc = -ENOMEM);
758
759         list_for_each (pos, &set->set_list) {
760                 req = list_entry(pos, struct lov_request, rq_link);
761
762                 if (!req->rq_complete || req->rq_rc)
763                         continue;
764                 if (req->rq_oi.oi_oa->o_valid == 0)   /* inactive stripe */
765                         continue;
766                 lov_merge_attrs(tmp_oa, req->rq_oi.oi_oa,
767                                 req->rq_oi.oi_oa->o_valid,
768                                 set->set_oi->oi_md, req->rq_stripe, &attrset);
769         }
770         if (!attrset) {
771                 CERROR("No stripes had valid attrs\n");
772                 rc = -EIO;
773         }
774         tmp_oa->o_id = set->set_oi->oi_oa->o_id;
775         memcpy(set->set_oi->oi_oa, tmp_oa, sizeof(*set->set_oi->oi_oa));
776 out:
777         if (tmp_oa)
778                 OBDO_FREE(tmp_oa);
779         RETURN(rc);
780
781 }
782
783 static int brw_done(struct lov_request_set *set)
784 {
785         struct lov_stripe_md *lsm = set->set_oi->oi_md;
786         struct lov_oinfo     *loi = NULL;
787         struct list_head *pos;
788         struct lov_request *req;
789         ENTRY;
790
791         list_for_each (pos, &set->set_list) {
792                 req = list_entry(pos, struct lov_request, rq_link);
793
794                 if (!req->rq_complete || req->rq_rc)
795                         continue;
796
797                 loi = lsm->lsm_oinfo[req->rq_stripe];
798
799                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS)
800                         loi->loi_lvb.lvb_blocks = req->rq_oi.oi_oa->o_blocks;
801         }
802
803         RETURN(0);
804 }
805
806 int lov_fini_brw_set(struct lov_request_set *set)
807 {
808         int rc = 0;
809         ENTRY;
810
811         if (set == NULL)
812                 RETURN(0);
813         LASSERT(set->set_exp);
814         if (set->set_completes) {
815                 rc = brw_done(set);
816                 /* FIXME update qos data here */
817         }
818         if (atomic_dec_and_test(&set->set_refcount))
819                 lov_finish_set(set);
820
821         RETURN(rc);
822 }
823
824 int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo,
825                      obd_count oa_bufs, struct brw_page *pga,
826                      struct obd_trans_info *oti,
827                      struct lov_request_set **reqset)
828 {
829         struct {
830                 obd_count       index;
831                 obd_count       count;
832                 obd_count       off;
833         } *info = NULL;
834         struct lov_request_set *set;
835         struct lov_obd *lov = &exp->exp_obd->u.lov;
836         int rc = 0, i, shift;
837         ENTRY;
838
839         OBD_ALLOC(set, sizeof(*set));
840         if (set == NULL)
841                 RETURN(-ENOMEM);
842         lov_init_set(set);
843
844         set->set_exp = exp;
845         set->set_oti = oti;
846         set->set_oi = oinfo;
847         set->set_oabufs = oa_bufs;
848         OBD_ALLOC(set->set_pga, oa_bufs * sizeof(*set->set_pga));
849         if (!set->set_pga)
850                 GOTO(out, rc = -ENOMEM);
851
852         OBD_ALLOC(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
853         if (!info)
854                 GOTO(out, rc = -ENOMEM);
855
856         /* calculate the page count for each stripe */
857         for (i = 0; i < oa_bufs; i++) {
858                 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
859                 info[stripe].count++;
860         }
861
862         /* alloc and initialize lov request */
863         shift = 0;
864         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++){
865                 struct lov_oinfo *loi = NULL;
866                 struct lov_request *req;
867
868                 if (info[i].count == 0)
869                         continue;
870                 
871                 loi = oinfo->oi_md->lsm_oinfo[i];
872                 if (!lov->lov_tgts[loi->loi_ost_idx] || 
873                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
874                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
875                         GOTO(out, rc = -EIO);
876                 }
877
878                 OBD_ALLOC(req, sizeof(*req));
879                 if (req == NULL)
880                         GOTO(out, rc = -ENOMEM);
881
882                 OBDO_ALLOC(req->rq_oi.oi_oa);
883                 if (req->rq_oi.oi_oa == NULL) {
884                         OBD_FREE(req, sizeof(*req));
885                         GOTO(out, rc = -ENOMEM);
886                 }
887
888                 if (oinfo->oi_oa) {
889                         memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
890                                sizeof(*req->rq_oi.oi_oa));
891                 }
892                 req->rq_oi.oi_oa->o_id = loi->loi_id;
893                 req->rq_oi.oi_oa->o_stripe_idx = i;
894
895                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
896                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
897                 if (req->rq_oi.oi_md == NULL) {
898                         OBDO_FREE(req->rq_oi.oi_oa);
899                         OBD_FREE(req, sizeof(*req));
900                         GOTO(out, rc = -ENOMEM);
901                 }
902
903                 req->rq_idx = loi->loi_ost_idx;
904                 req->rq_stripe = i;
905
906                 /* XXX LOV STACKING */
907                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
908                 req->rq_oi.oi_md->lsm_object_gr = oinfo->oi_md->lsm_object_gr;
909                 req->rq_oabufs = info[i].count;
910                 req->rq_pgaidx = shift;
911                 shift += req->rq_oabufs;
912
913                 /* remember the index for sort brw_page array */
914                 info[i].index = req->rq_pgaidx;
915
916                 req->rq_oi.oi_capa = oinfo->oi_capa;
917
918                 lov_set_add_req(req, set);
919         }
920         if (!set->set_count)
921                 GOTO(out, rc = -EIO);
922
923         /* rotate & sort the brw_page array */
924         for (i = 0; i < oa_bufs; i++) {
925                 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
926
927                 shift = info[stripe].index + info[stripe].off;
928                 LASSERT(shift < oa_bufs);
929                 set->set_pga[shift] = pga[i];
930                 lov_stripe_offset(oinfo->oi_md, pga[i].off, stripe,
931                                   &set->set_pga[shift].off);
932                 info[stripe].off++;
933         }
934 out:
935         if (info)
936                 OBD_FREE(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
937
938         if (rc == 0)
939                 *reqset = set;
940         else
941                 lov_fini_brw_set(set);
942
943         RETURN(rc);
944 }
945
946 int lov_fini_getattr_set(struct lov_request_set *set)
947 {
948         int rc = 0;
949         ENTRY;
950
951         if (set == NULL)
952                 RETURN(0);
953         LASSERT(set->set_exp);
954         if (set->set_completes)
955                 rc = common_attr_done(set);
956
957         if (atomic_dec_and_test(&set->set_refcount))
958                 lov_finish_set(set);
959
960         RETURN(rc);
961 }
962
963 /* The callback for osc_getattr_async that finilizes a request info when a
964  * response is recieved. */
965 static int cb_getattr_update(struct obd_info *oinfo, int rc)
966 {
967         struct lov_request *lovreq;
968         lovreq = container_of(oinfo, struct lov_request, rq_oi);
969         return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
970 }
971
972 int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo,
973                          struct lov_request_set **reqset)
974 {
975         struct lov_request_set *set;
976         struct lov_obd *lov = &exp->exp_obd->u.lov;
977         int rc = 0, i;
978         ENTRY;
979
980         OBD_ALLOC(set, sizeof(*set));
981         if (set == NULL)
982                 RETURN(-ENOMEM);
983         lov_init_set(set);
984
985         set->set_exp = exp;
986         set->set_oi = oinfo;
987
988         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
989                 struct lov_oinfo *loi;
990                 struct lov_request *req;
991
992                 loi = oinfo->oi_md->lsm_oinfo[i];
993                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
994                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
995                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
996                         continue;
997                 }
998
999                 OBD_ALLOC(req, sizeof(*req));
1000                 if (req == NULL)
1001                         GOTO(out_set, rc = -ENOMEM);
1002
1003                 req->rq_stripe = i;
1004                 req->rq_idx = loi->loi_ost_idx;
1005
1006                 OBDO_ALLOC(req->rq_oi.oi_oa);
1007                 if (req->rq_oi.oi_oa == NULL) {
1008                         OBD_FREE(req, sizeof(*req));
1009                         GOTO(out_set, rc = -ENOMEM);
1010                 }
1011                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1012                        sizeof(*req->rq_oi.oi_oa));
1013                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1014                 req->rq_oi.oi_cb_up = cb_getattr_update;
1015                 req->rq_oi.oi_capa = oinfo->oi_capa;
1016                 req->rq_rqset = set;
1017
1018                 lov_set_add_req(req, set);
1019         }
1020         if (!set->set_count)
1021                 GOTO(out_set, rc = -EIO);
1022         *reqset = set;
1023         RETURN(rc);
1024 out_set:
1025         lov_fini_getattr_set(set);
1026         RETURN(rc);
1027 }
1028
1029 int lov_fini_destroy_set(struct lov_request_set *set)
1030 {
1031         ENTRY;
1032
1033         if (set == NULL)
1034                 RETURN(0);
1035         LASSERT(set->set_exp);
1036         if (set->set_completes) {
1037                 /* FIXME update qos data here */
1038         }
1039
1040         if (atomic_dec_and_test(&set->set_refcount))
1041                 lov_finish_set(set);
1042
1043         RETURN(0);
1044 }
1045
1046 int lov_prep_destroy_set(struct obd_export *exp, struct obd_info *oinfo,
1047                          struct obdo *src_oa, struct lov_stripe_md *lsm,
1048                          struct obd_trans_info *oti,
1049                          struct lov_request_set **reqset)
1050 {
1051         struct lov_request_set *set;
1052         struct lov_obd *lov = &exp->exp_obd->u.lov;
1053         int rc = 0, i;
1054         ENTRY;
1055
1056         OBD_ALLOC(set, sizeof(*set));
1057         if (set == NULL)
1058                 RETURN(-ENOMEM);
1059         lov_init_set(set);
1060
1061         set->set_exp = exp;
1062         set->set_oi = oinfo;
1063         set->set_oi->oi_md = lsm;
1064         set->set_oi->oi_oa = src_oa;
1065         set->set_oti = oti;
1066         if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE)
1067                 set->set_cookies = oti->oti_logcookies;
1068
1069         for (i = 0; i < lsm->lsm_stripe_count; i++) {
1070                 struct lov_oinfo *loi;
1071                 struct lov_request *req;
1072
1073                 loi = lsm->lsm_oinfo[i];
1074                 if (!lov->lov_tgts[loi->loi_ost_idx] || 
1075                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1076                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1077                         continue;
1078                 }
1079
1080                 OBD_ALLOC(req, sizeof(*req));
1081                 if (req == NULL)
1082                         GOTO(out_set, rc = -ENOMEM);
1083
1084                 req->rq_stripe = i;
1085                 req->rq_idx = loi->loi_ost_idx;
1086
1087                 OBDO_ALLOC(req->rq_oi.oi_oa);
1088                 if (req->rq_oi.oi_oa == NULL) {
1089                         OBD_FREE(req, sizeof(*req));
1090                         GOTO(out_set, rc = -ENOMEM);
1091                 }
1092                 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
1093                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1094                 lov_set_add_req(req, set);
1095         }
1096         if (!set->set_count)
1097                 GOTO(out_set, rc = -EIO);
1098         *reqset = set;
1099         RETURN(rc);
1100 out_set:
1101         lov_fini_destroy_set(set);
1102         RETURN(rc);
1103 }
1104
1105 int lov_fini_setattr_set(struct lov_request_set *set)
1106 {
1107         int rc = 0;
1108         ENTRY;
1109
1110         if (set == NULL)
1111                 RETURN(0);
1112         LASSERT(set->set_exp);
1113         if (set->set_completes) {
1114                 rc = common_attr_done(set);
1115                 /* FIXME update qos data here */
1116         }
1117
1118         if (atomic_dec_and_test(&set->set_refcount))
1119                 lov_finish_set(set);
1120         RETURN(rc);
1121 }
1122
1123 int lov_update_setattr_set(struct lov_request_set *set,
1124                            struct lov_request *req, int rc)
1125 {
1126         struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1127         struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1128         ENTRY;
1129
1130         lov_update_set(set, req, rc);
1131
1132         /* grace error on inactive ost */
1133         if (rc && !(lov->lov_tgts[req->rq_idx] && 
1134                     lov->lov_tgts[req->rq_idx]->ltd_active))
1135                 rc = 0;
1136
1137         if (rc == 0) {
1138                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCTIME)
1139                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_ctime =
1140                                 req->rq_oi.oi_oa->o_ctime;
1141                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLMTIME)
1142                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_mtime =
1143                                 req->rq_oi.oi_oa->o_mtime;
1144                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLATIME)
1145                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_atime =
1146                                 req->rq_oi.oi_oa->o_atime;
1147         }
1148
1149         RETURN(rc);
1150 }
1151
1152 /* The callback for osc_setattr_async that finilizes a request info when a
1153  * response is recieved. */
1154 static int cb_setattr_update(struct obd_info *oinfo, int rc)
1155 {
1156         struct lov_request *lovreq;
1157         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1158         return lov_update_setattr_set(lovreq->rq_rqset, lovreq, rc);
1159 }
1160
1161 int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo,
1162                          struct obd_trans_info *oti,
1163                          struct lov_request_set **reqset)
1164 {
1165         struct lov_request_set *set;
1166         struct lov_obd *lov = &exp->exp_obd->u.lov;
1167         int rc = 0, i;
1168         ENTRY;
1169
1170         OBD_ALLOC(set, sizeof(*set));
1171         if (set == NULL)
1172                 RETURN(-ENOMEM);
1173         lov_init_set(set);
1174
1175         set->set_exp = exp;
1176         set->set_oti = oti;
1177         set->set_oi = oinfo;
1178         if (oti != NULL && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
1179                 set->set_cookies = oti->oti_logcookies;
1180
1181         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1182                 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1183                 struct lov_request *req;
1184
1185                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1186                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1187                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1188                         continue;
1189                 }
1190
1191                 OBD_ALLOC(req, sizeof(*req));
1192                 if (req == NULL)
1193                         GOTO(out_set, rc = -ENOMEM);
1194                 req->rq_stripe = i;
1195                 req->rq_idx = loi->loi_ost_idx;
1196
1197                 OBDO_ALLOC(req->rq_oi.oi_oa);
1198                 if (req->rq_oi.oi_oa == NULL) {
1199                         OBD_FREE(req, sizeof(*req));
1200                         GOTO(out_set, rc = -ENOMEM);
1201                 }
1202                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1203                        sizeof(*req->rq_oi.oi_oa));
1204                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1205                 LASSERT(!(req->rq_oi.oi_oa->o_valid & OBD_MD_FLGROUP) 
1206                                 || req->rq_oi.oi_oa->o_gr>0);
1207                 req->rq_oi.oi_oa->o_stripe_idx = i;
1208                 req->rq_oi.oi_cb_up = cb_setattr_update;
1209                 req->rq_oi.oi_capa = oinfo->oi_capa;
1210                 req->rq_rqset = set;
1211
1212                 if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE) {
1213                         int off = lov_stripe_offset(oinfo->oi_md,
1214                                                     oinfo->oi_oa->o_size, i,
1215                                                     &req->rq_oi.oi_oa->o_size);
1216
1217                         if (off < 0 && req->rq_oi.oi_oa->o_size)
1218                                 req->rq_oi.oi_oa->o_size--;
1219
1220                         CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
1221                                i, req->rq_oi.oi_oa->o_size,
1222                                oinfo->oi_oa->o_size);
1223                 }
1224                 lov_set_add_req(req, set);
1225         }
1226         if (!set->set_count)
1227                 GOTO(out_set, rc = -EIO);
1228         *reqset = set;
1229         RETURN(rc);
1230 out_set:
1231         lov_fini_setattr_set(set);
1232         RETURN(rc);
1233 }
1234
1235 int lov_fini_punch_set(struct lov_request_set *set)
1236 {
1237         int rc = 0;
1238         ENTRY;
1239
1240         if (set == NULL)
1241                 RETURN(0);
1242         LASSERT(set->set_exp);
1243         if (set->set_completes) {
1244                 rc = -EIO;
1245                 /* FIXME update qos data here */
1246                 if (set->set_success)
1247                         rc = common_attr_done(set);
1248         }
1249
1250         if (atomic_dec_and_test(&set->set_refcount))
1251                 lov_finish_set(set);
1252
1253         RETURN(rc);
1254 }
1255
1256 int lov_update_punch_set(struct lov_request_set *set,
1257                          struct lov_request *req, int rc)
1258 {
1259         struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1260         struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1261         ENTRY;
1262
1263         lov_update_set(set, req, rc);
1264
1265         /* grace error on inactive ost */
1266         if (rc && !lov->lov_tgts[req->rq_idx]->ltd_active)
1267                 rc = 0;
1268
1269         if (rc == 0) {
1270                 lov_stripe_lock(lsm);
1271                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS) {
1272                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_blocks =
1273                                 req->rq_oi.oi_oa->o_blocks;
1274                 }
1275
1276                 /* Do we need to update lvb_size here? It needn't because
1277                  * it have been done in ll_truncate(). -jay */
1278                 lov_stripe_unlock(lsm);
1279         }
1280
1281         RETURN(rc);
1282 }
1283
1284 /* The callback for osc_punch that finilizes a request info when a response
1285  * is recieved. */
1286 static int cb_update_punch(struct obd_info *oinfo, int rc)
1287 {
1288         struct lov_request *lovreq;
1289         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1290         return lov_update_punch_set(lovreq->rq_rqset, lovreq, rc);
1291 }
1292
1293 int lov_prep_punch_set(struct obd_export *exp, struct obd_info *oinfo,
1294                        struct obd_trans_info *oti,
1295                        struct lov_request_set **reqset)
1296 {
1297         struct lov_request_set *set;
1298         struct lov_obd *lov = &exp->exp_obd->u.lov;
1299         int rc = 0, i;
1300         ENTRY;
1301
1302         OBD_ALLOC(set, sizeof(*set));
1303         if (set == NULL)
1304                 RETURN(-ENOMEM);
1305         lov_init_set(set);
1306
1307         set->set_oi = oinfo;
1308         set->set_exp = exp;
1309
1310         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1311                 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1312                 struct lov_request *req;
1313                 obd_off rs, re;
1314
1315                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1316                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1317                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1318                         continue;
1319                 }
1320
1321                 if (!lov_stripe_intersects(oinfo->oi_md, i,
1322                                            oinfo->oi_policy.l_extent.start,
1323                                            oinfo->oi_policy.l_extent.end,
1324                                            &rs, &re))
1325                         continue;
1326
1327                 OBD_ALLOC(req, sizeof(*req));
1328                 if (req == NULL)
1329                         GOTO(out_set, rc = -ENOMEM);
1330                 req->rq_stripe = i;
1331                 req->rq_idx = loi->loi_ost_idx;
1332
1333                 OBDO_ALLOC(req->rq_oi.oi_oa);
1334                 if (req->rq_oi.oi_oa == NULL) {
1335                         OBD_FREE(req, sizeof(*req));
1336                         GOTO(out_set, rc = -ENOMEM);
1337                 }
1338                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1339                        sizeof(*req->rq_oi.oi_oa));
1340                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1341                 req->rq_oi.oi_oa->o_gr = loi->loi_gr;
1342                 req->rq_oi.oi_oa->o_valid |= OBD_MD_FLGROUP;
1343
1344                 req->rq_oi.oi_oa->o_stripe_idx = i;
1345                 req->rq_oi.oi_cb_up = cb_update_punch;
1346                 req->rq_rqset = set;
1347
1348                 req->rq_oi.oi_policy.l_extent.start = rs;
1349                 req->rq_oi.oi_policy.l_extent.end = re;
1350                 req->rq_oi.oi_policy.l_extent.gid = -1;
1351
1352                 req->rq_oi.oi_capa = oinfo->oi_capa;
1353
1354                 lov_set_add_req(req, set);
1355         }
1356         if (!set->set_count)
1357                 GOTO(out_set, rc = -EIO);
1358         *reqset = set;
1359         RETURN(rc);
1360 out_set:
1361         lov_fini_punch_set(set);
1362         RETURN(rc);
1363 }
1364
1365 int lov_fini_sync_set(struct lov_request_set *set)
1366 {
1367         int rc = 0;
1368         ENTRY;
1369
1370         if (set == NULL)
1371                 RETURN(0);
1372         LASSERT(set->set_exp);
1373         if (set->set_completes) {
1374                 if (!set->set_success)
1375                         rc = -EIO;
1376                 /* FIXME update qos data here */
1377         }
1378
1379         if (atomic_dec_and_test(&set->set_refcount))
1380                 lov_finish_set(set);
1381
1382         RETURN(rc);
1383 }
1384
1385 int lov_prep_sync_set(struct obd_export *exp, struct obd_info *oinfo,
1386                       struct obdo *src_oa, struct lov_stripe_md *lsm,
1387                       obd_off start, obd_off end,
1388                       struct lov_request_set **reqset)
1389 {
1390         struct lov_request_set *set;
1391         struct lov_obd *lov = &exp->exp_obd->u.lov;
1392         int rc = 0, i;
1393         ENTRY;
1394
1395         OBD_ALLOC(set, sizeof(*set));
1396         if (set == NULL)
1397                 RETURN(-ENOMEM);
1398         lov_init_set(set);
1399
1400         set->set_exp = exp;
1401         set->set_oi = oinfo;
1402         set->set_oi->oi_md = lsm;
1403         set->set_oi->oi_oa = src_oa;
1404
1405         for (i = 0; i < lsm->lsm_stripe_count; i++) {
1406                 struct lov_oinfo *loi = lsm->lsm_oinfo[i];
1407                 struct lov_request *req;
1408                 obd_off rs, re;
1409
1410                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1411                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1412                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1413                         continue;
1414                 }
1415
1416                 if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
1417                         continue;
1418
1419                 OBD_ALLOC(req, sizeof(*req));
1420                 if (req == NULL)
1421                         GOTO(out_set, rc = -ENOMEM);
1422                 req->rq_stripe = i;
1423                 req->rq_idx = loi->loi_ost_idx;
1424
1425                 OBDO_ALLOC(req->rq_oi.oi_oa);
1426                 if (req->rq_oi.oi_oa == NULL) {
1427                         OBD_FREE(req, sizeof(*req));
1428                         GOTO(out_set, rc = -ENOMEM);
1429                 }
1430                 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
1431                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1432                 req->rq_oi.oi_oa->o_stripe_idx = i;
1433
1434                 req->rq_oi.oi_policy.l_extent.start = rs;
1435                 req->rq_oi.oi_policy.l_extent.end = re;
1436                 req->rq_oi.oi_policy.l_extent.gid = -1;
1437
1438                 lov_set_add_req(req, set);
1439         }
1440         if (!set->set_count)
1441                 GOTO(out_set, rc = -EIO);
1442         *reqset = set;
1443         RETURN(rc);
1444 out_set:
1445         lov_fini_sync_set(set);
1446         RETURN(rc);
1447 }
1448
1449 #define LOV_U64_MAX ((__u64)~0ULL)
1450 #define LOV_SUM_MAX(tot, add)                                           \
1451         do {                                                            \
1452                 if ((tot) + (add) < (tot))                              \
1453                         (tot) = LOV_U64_MAX;                            \
1454                 else                                                    \
1455                         (tot) += (add);                                 \
1456         } while(0)
1457
1458 int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs,int success)
1459 {
1460         ENTRY;
1461
1462         if (success) {
1463                 __u32 expected_stripes = lov_get_stripecnt(&obd->u.lov, 0);
1464
1465                 if (osfs->os_files != LOV_U64_MAX)
1466                         do_div(osfs->os_files, expected_stripes);
1467                 if (osfs->os_ffree != LOV_U64_MAX)
1468                         do_div(osfs->os_ffree, expected_stripes);
1469
1470                 spin_lock(&obd->obd_osfs_lock);
1471                 memcpy(&obd->obd_osfs, osfs, sizeof(*osfs));
1472                 obd->obd_osfs_age = get_jiffies_64();
1473                 spin_unlock(&obd->obd_osfs_lock);
1474                 RETURN(0);
1475         }
1476
1477         RETURN(-EIO);
1478 }
1479
1480 int lov_fini_statfs_set(struct lov_request_set *set)
1481 {
1482         int rc = 0;
1483         ENTRY;
1484
1485         if (set == NULL)
1486                 RETURN(0);
1487
1488         if (set->set_completes) {
1489                 rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs,
1490                                      set->set_success);
1491         }
1492
1493         if (atomic_dec_and_test(&set->set_refcount))
1494                 lov_finish_set(set);
1495
1496         RETURN(rc);
1497 }
1498
1499 void lov_update_statfs(struct obd_device *obd, struct obd_statfs *osfs,
1500                        struct obd_statfs *lov_sfs, int success)
1501 {
1502         int shift = 0, quit = 0;
1503         __u64 tmp;
1504         spin_lock(&obd->obd_osfs_lock);
1505         memcpy(&obd->obd_osfs, lov_sfs, sizeof(*lov_sfs));
1506         obd->obd_osfs_age = get_jiffies_64();
1507         spin_unlock(&obd->obd_osfs_lock);
1508
1509         if (success == 0) {
1510                 memcpy(osfs, lov_sfs, sizeof(*lov_sfs));
1511         } else {
1512                 if (osfs->os_bsize != lov_sfs->os_bsize) {
1513                         /* assume all block sizes are always powers of 2 */
1514                         /* get the bits difference */
1515                         tmp = osfs->os_bsize | lov_sfs->os_bsize;
1516                         for (shift = 0; shift <= 64; ++shift) {
1517                                 if (tmp & 1) {
1518                                         if (quit)
1519                                                 break;
1520                                         else
1521                                                 quit = 1;
1522                                         shift = 0;
1523                                 }
1524                                 tmp >>= 1;
1525                         }
1526                 }
1527
1528                 if (osfs->os_bsize < lov_sfs->os_bsize) {
1529                         osfs->os_bsize = lov_sfs->os_bsize;
1530
1531                         osfs->os_bfree  >>= shift;
1532                         osfs->os_bavail >>= shift;
1533                         osfs->os_blocks >>= shift;
1534                 } else if (shift != 0) {
1535                         lov_sfs->os_bfree  >>= shift;
1536                         lov_sfs->os_bavail >>= shift;
1537                         lov_sfs->os_blocks >>= shift;
1538                 }
1539 #ifdef MIN_DF
1540                 /* Sandia requested that df (and so, statfs) only
1541                    returned minimal available space on
1542                    a single OST, so people would be able to
1543                    write this much data guaranteed. */
1544                 if (osfs->os_bavail > lov_sfs->os_bavail) {
1545                         /* Presumably if new bavail is smaller,
1546                            new bfree is bigger as well */
1547                         osfs->os_bfree = lov_sfs->os_bfree;
1548                         osfs->os_bavail = lov_sfs->os_bavail;
1549                 }
1550 #else
1551                 osfs->os_bfree += lov_sfs->os_bfree;
1552                 osfs->os_bavail += lov_sfs->os_bavail;
1553 #endif
1554                 osfs->os_blocks += lov_sfs->os_blocks;
1555                 /* XXX not sure about this one - depends on policy.
1556                  *   - could be minimum if we always stripe on all OBDs
1557                  *     (but that would be wrong for any other policy,
1558                  *     if one of the OBDs has no more objects left)
1559                  *   - could be sum if we stripe whole objects
1560                  *   - could be average, just to give a nice number
1561                  *
1562                  * To give a "reasonable" (if not wholly accurate)
1563                  * number, we divide the total number of free objects
1564                  * by expected stripe count (watch out for overflow).
1565                  */
1566                 LOV_SUM_MAX(osfs->os_files, lov_sfs->os_files);
1567                 LOV_SUM_MAX(osfs->os_ffree, lov_sfs->os_ffree);
1568         }
1569 }
1570
1571 /* The callback for osc_statfs_async that finilizes a request info when a
1572  * response is recieved. */
1573 static int cb_statfs_update(struct obd_info *oinfo, int rc)
1574 {
1575         struct lov_request *lovreq;
1576         struct obd_statfs *osfs, *lov_sfs;
1577         struct obd_device *obd;
1578         struct lov_obd *lov;
1579         int success;
1580         ENTRY;
1581
1582         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1583         lov = &lovreq->rq_rqset->set_obd->u.lov;
1584         obd = class_exp2obd(lov->lov_tgts[lovreq->rq_idx]->ltd_exp);
1585
1586         osfs = lovreq->rq_rqset->set_oi->oi_osfs;
1587         lov_sfs = oinfo->oi_osfs;
1588
1589         success = lovreq->rq_rqset->set_success;
1590
1591         /* XXX: the same is done in lov_update_common_set, however
1592            lovset->set_exp is not initialized. */
1593         lov_update_set(lovreq->rq_rqset, lovreq, rc);
1594         if (rc) {
1595                 if (rc && !(lov->lov_tgts[lovreq->rq_idx] &&
1596                             lov->lov_tgts[lovreq->rq_idx]->ltd_active))
1597                         rc = 0;
1598                 RETURN(rc);
1599         }
1600
1601         lov_update_statfs(obd, osfs, lov_sfs, success);
1602         qos_update(lov);
1603
1604         RETURN(0);
1605 }
1606
1607 int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo,
1608                         struct lov_request_set **reqset)
1609 {
1610         struct lov_request_set *set;
1611         struct lov_obd *lov = &obd->u.lov;
1612         int rc = 0, i;
1613         ENTRY;
1614
1615         OBD_ALLOC(set, sizeof(*set));
1616         if (set == NULL)
1617                 RETURN(-ENOMEM);
1618         lov_init_set(set);
1619
1620         set->set_obd = obd;
1621         set->set_oi = oinfo;
1622
1623         /* We only get block data from the OBD */
1624         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1625                 struct lov_request *req;
1626
1627                 if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_active) {
1628                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
1629                         continue;
1630                 }
1631
1632                 OBD_ALLOC(req, sizeof(*req));
1633                 if (req == NULL)
1634                         GOTO(out_set, rc = -ENOMEM);
1635
1636                 OBD_ALLOC(req->rq_oi.oi_osfs, sizeof(*req->rq_oi.oi_osfs));
1637                 if (req->rq_oi.oi_osfs == NULL) {
1638                         OBD_FREE(req, sizeof(*req));
1639                         GOTO(out_set, rc = -ENOMEM);
1640                 }
1641
1642                 req->rq_idx = i;
1643                 req->rq_oi.oi_cb_up = cb_statfs_update;
1644                 req->rq_rqset = set;
1645
1646                 lov_set_add_req(req, set);
1647         }
1648         if (!set->set_count)
1649                 GOTO(out_set, rc = -EIO);
1650         *reqset = set;
1651         RETURN(rc);
1652 out_set:
1653         lov_fini_statfs_set(set);
1654         RETURN(rc);
1655 }