Whamcloud - gitweb
LU-6245 server: remove types abstraction from MDS/MGS code
[fs/lustre-release.git] / lustre / osp / osp_sync.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2012, 2015, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/osp/osp_sync.c
37  *
38  * Lustre OST Proxy Device
39  *
40  * Author: Alex Zhuravlev <alexey.zhuravlev@intel.com>
41  * Author: Mikhail Pershin <mike.pershin@intel.com>
42  */
43
44 #define DEBUG_SUBSYSTEM S_MDS
45
46 #include <linux/kthread.h>
47 #include <lustre_log.h>
48 #include <lustre_update.h>
49 #include "osp_internal.h"
50
51 static int osp_sync_id_traction_init(struct osp_device *d);
52 static void osp_sync_id_traction_fini(struct osp_device *d);
53 static __u64 osp_sync_id_get(struct osp_device *d, __u64 id);
54 static void osp_sync_remove_from_tracker(struct osp_device *d);
55
56 /*
57  * this is a components of OSP implementing synchronization between MDS and OST
58  * it llogs all interesting changes (currently it's uig/gid change and object
59  * destroy) atomically, then makes sure changes hit OST storage
60  *
61  * we have 4 queues of work:
62  *
63  * the first queue is llog itself, once read a change is stored in 2nd queue
64  * in form of RPC (but RPC isn't fired yet).
65  *
66  * the second queue (opd_syn_waiting_for_commit) holds changes awaiting local
67  * commit. once change is committed locally it migrates onto 3rd queue.
68  *
69  * the third queue (opd_syn_committed_here) holds changes committed locally,
70  * but not sent to OST (as the pipe can be full). once pipe becomes non-full
71  * we take a change from the queue and fire corresponded RPC.
72  *
73  * once RPC is reported committed by OST (using regular last_committed mech.)
74  * the change jumps into 4th queue (opd_syn_committed_there), now we can
75  * cancel corresponded llog record and release RPC
76  *
77  * opd_syn_changes is a number of unread llog records (to be processed).
78  * notice this number doesn't include llog records from previous boots.
79  * with OSP_SYN_THRESHOLD we try to batch processing a bit (TO BE IMPLEMENTED)
80  *
81  * opd_syn_rpc_in_progress is a number of requests in 2-4 queues.
82  * we control this with OSP_MAX_IN_PROGRESS so that OSP don't consume
83  * too much memory -- how to deal with 1000th OSTs ? batching could help?
84  *
85  * opd_syn_rpc_in_flight is a number of RPC in flight.
86  * we control this with OSP_MAX_IN_FLIGHT
87  */
88
89 /* XXX: do math to learn reasonable threshold
90  * should it be ~ number of changes fitting bulk? */
91
92 #define OSP_SYN_THRESHOLD       10
93 #define OSP_MAX_IN_FLIGHT       8
94 #define OSP_MAX_IN_PROGRESS     4096
95
96 #define OSP_JOB_MAGIC           0x26112005
97
98 struct osp_job_req_args {
99         /** bytes reserved for ptlrpc_replay_req() */
100         struct ptlrpc_replay_async_args jra_raa;
101         struct list_head                jra_committed_link;
102         struct list_head                jra_inflight_link;
103         __u32                           jra_magic;
104 };
105
106 static inline int osp_sync_running(struct osp_device *d)
107 {
108         return !!(d->opd_syn_thread.t_flags & SVC_RUNNING);
109 }
110
111 /**
112  * Check status: whether OSP thread has stopped
113  *
114  * \param[in] d         OSP device
115  *
116  * \retval 0            still running
117  * \retval 1            stopped
118  */
119 static inline int osp_sync_stopped(struct osp_device *d)
120 {
121         return !!(d->opd_syn_thread.t_flags & SVC_STOPPED);
122 }
123
124 /*
125  ** Check for new changes to sync
126  *
127  * \param[in] d         OSP device
128  *
129  * \retval 1            there are changes
130  * \retval 0            there are no changes
131  */
132 static inline int osp_sync_has_new_job(struct osp_device *d)
133 {
134         return ((d->opd_syn_last_processed_id < d->opd_syn_last_used_id) &&
135                 (d->opd_syn_last_processed_id < d->opd_syn_last_committed_id))
136                 || (d->opd_syn_prev_done == 0);
137 }
138
139 static inline int osp_sync_inflight_conflict(struct osp_device *d,
140                                              struct llog_rec_hdr *h)
141 {
142         struct osp_job_req_args *jra;
143         struct ost_id            ostid;
144         int                      conflict = 0;
145
146         if (h == NULL || h->lrh_type == LLOG_GEN_REC ||
147             list_empty(&d->opd_syn_inflight_list))
148                 return conflict;
149
150         memset(&ostid, 0, sizeof(ostid));
151         switch (h->lrh_type) {
152         case MDS_UNLINK_REC:
153                 ostid_set_seq(&ostid, ((struct llog_unlink_rec *)h)->lur_oseq);
154                 ostid_set_id(&ostid, ((struct llog_unlink_rec *)h)->lur_oid);
155                 break;
156         case MDS_UNLINK64_REC:
157                 fid_to_ostid(&((struct llog_unlink64_rec *)h)->lur_fid, &ostid);
158                 break;
159         case MDS_SETATTR64_REC:
160                 ostid = ((struct llog_setattr64_rec *)h)->lsr_oi;
161                 break;
162         default:
163                 LBUG();
164         }
165
166         spin_lock(&d->opd_syn_lock);
167         list_for_each_entry(jra, &d->opd_syn_inflight_list, jra_inflight_link) {
168                 struct ptlrpc_request   *req;
169                 struct ost_body         *body;
170
171                 LASSERT(jra->jra_magic == OSP_JOB_MAGIC);
172
173                 req = container_of((void *)jra, struct ptlrpc_request,
174                                    rq_async_args);
175                 body = req_capsule_client_get(&req->rq_pill,
176                                               &RMF_OST_BODY);
177                 LASSERT(body);
178
179                 if (memcmp(&ostid, &body->oa.o_oi, sizeof(ostid)) == 0) {
180                         conflict = 1;
181                         break;
182                 }
183         }
184         spin_unlock(&d->opd_syn_lock);
185
186         return conflict;
187 }
188
189 static inline int osp_sync_low_in_progress(struct osp_device *d)
190 {
191         return atomic_read(&d->opd_syn_rpc_in_progress) <
192                 d->opd_syn_max_rpc_in_progress;
193 }
194
195 /**
196  * Check for room in the network pipe to OST
197  *
198  * \param[in] d         OSP device
199  *
200  * \retval 1            there is room
201  * \retval 0            no room, the pipe is full
202  */
203 static inline int osp_sync_low_in_flight(struct osp_device *d)
204 {
205         return atomic_read(&d->opd_syn_rpc_in_flight) <
206                 d->opd_syn_max_rpc_in_flight;
207 }
208
209 /**
210  * Wake up check for the main sync thread
211  *
212  * \param[in] d         OSP device
213  *
214  * \retval 1            time to wake up
215  * \retval 0            no need to wake up
216  */
217 static inline int osp_sync_has_work(struct osp_device *d)
218 {
219         /* has new/old changes and low in-progress? */
220         if (osp_sync_has_new_job(d) && osp_sync_low_in_progress(d) &&
221             osp_sync_low_in_flight(d) && d->opd_imp_connected)
222                 return 1;
223
224         /* has remotely committed? */
225         if (!list_empty(&d->opd_syn_committed_there))
226                 return 1;
227
228         return 0;
229 }
230
231 #define osp_sync_check_for_work(d)                      \
232 {                                                       \
233         if (osp_sync_has_work(d)) {                     \
234                 wake_up(&d->opd_syn_waitq);    \
235         }                                               \
236 }
237
238 void __osp_sync_check_for_work(struct osp_device *d)
239 {
240         osp_sync_check_for_work(d);
241 }
242
243 static inline __u64 osp_sync_correct_id(struct osp_device *d,
244                                         struct llog_rec_hdr *rec)
245 {
246         /*
247          * llog use cyclic store with 32 bit lrh_id
248          * so overflow lrh_id is possible. Range between
249          * last_processed and last_committed is less than
250          * 64745 ^ 2 and less than 2^32 - 1
251          */
252         __u64 correct_id = d->opd_syn_last_committed_id;
253
254         if ((correct_id & 0xffffffffULL) < rec->lrh_id)
255                 correct_id -= 0x100000000ULL;
256
257         correct_id &= ~0xffffffffULL;
258         correct_id |= rec->lrh_id;
259
260         return correct_id;
261 }
262 /**
263  * Check and return ready-for-new status.
264  *
265  * The thread processing llog record uses this function to check whether
266  * it's time to take another record and process it. The number of conditions
267  * must be met: the connection should be ready, RPCs in flight not exceeding
268  * the limit, the record is committed locally, etc (see the lines below).
269  *
270  * \param[in] d         OSP device
271  * \param[in] rec       next llog record to process
272  *
273  * \retval 0            not ready
274  * \retval 1            ready
275  */
276 static inline int osp_sync_can_process_new(struct osp_device *d,
277                                            struct llog_rec_hdr *rec)
278 {
279         LASSERT(d);
280
281         if (unlikely(atomic_read(&d->opd_syn_barrier) > 0))
282                 return 0;
283         if (unlikely(osp_sync_inflight_conflict(d, rec)))
284                 return 0;
285         if (!osp_sync_low_in_progress(d))
286                 return 0;
287         if (!osp_sync_low_in_flight(d))
288                 return 0;
289         if (!d->opd_imp_connected)
290                 return 0;
291         if (d->opd_syn_prev_done == 0)
292                 return 1;
293         if (atomic_read(&d->opd_syn_changes) == 0)
294                 return 0;
295         if (rec == NULL ||
296             osp_sync_correct_id(d, rec) <= d->opd_syn_last_committed_id)
297                 return 1;
298         return 0;
299 }
300
301 /**
302  * Declare intention to add a new change.
303  *
304  * With regard to OSD API, we have to declare any changes ahead. In this
305  * case we declare an intention to add a llog record representing the
306  * change on the local storage.
307  *
308  * \param[in] env       LU environment provided by the caller
309  * \param[in] o         OSP object
310  * \param[in] type      type of change: MDS_UNLINK64_REC or MDS_SETATTR64_REC
311  * \param[in] th        transaction handle (local)
312  *
313  * \retval 0            on success
314  * \retval negative     negated errno on error
315  */
316 int osp_sync_declare_add(const struct lu_env *env, struct osp_object *o,
317                          llog_op_type type, struct thandle *th)
318 {
319         struct osp_thread_info  *osi = osp_env_info(env);
320         struct osp_device       *d = lu2osp_dev(o->opo_obj.do_lu.lo_dev);
321         struct llog_ctxt        *ctxt;
322         struct thandle          *storage_th;
323         int                      rc;
324
325         ENTRY;
326
327         /* it's a layering violation, to access internals of th,
328          * but we can do this as a sanity check, for a while */
329         LASSERT(th->th_top != NULL);
330         storage_th = thandle_get_sub_by_dt(env, th->th_top, d->opd_storage);
331         if (IS_ERR(storage_th))
332                 RETURN(PTR_ERR(storage_th));
333
334         switch (type) {
335         case MDS_UNLINK64_REC:
336                 osi->osi_hdr.lrh_len = sizeof(struct llog_unlink64_rec);
337                 break;
338         case MDS_SETATTR64_REC:
339                 osi->osi_hdr.lrh_len = sizeof(struct llog_setattr64_rec);
340                 break;
341         default:
342                 LBUG();
343         }
344
345         /* we want ->dt_trans_start() to allocate per-thandle structure */
346         storage_th->th_tags |= LCT_OSP_THREAD;
347
348         ctxt = llog_get_context(d->opd_obd, LLOG_MDS_OST_ORIG_CTXT);
349         LASSERT(ctxt);
350
351         rc = llog_declare_add(env, ctxt->loc_handle, &osi->osi_hdr,
352                               storage_th);
353         llog_ctxt_put(ctxt);
354
355         RETURN(rc);
356 }
357
358 /**
359  * Generate a llog record for a given change.
360  *
361  * Generates a llog record for the change passed. The change can be of two
362  * types: unlink and setattr. The record gets an ID which later will be
363  * used to track commit status of the change. For unlink changes, the caller
364  * can supply a starting FID and the count of the objects to destroy. For
365  * setattr the caller should apply attributes to apply.
366  *
367  *
368  * \param[in] env       LU environment provided by the caller
369  * \param[in] d         OSP device
370  * \param[in] fid       fid of the object the change should be applied to
371  * \param[in] type      type of change: MDS_UNLINK64_REC or MDS_SETATTR64_REC
372  * \param[in] count     count of objects to destroy
373  * \param[in] th        transaction handle (local)
374  * \param[in] attr      attributes for setattr
375  *
376  * \retval 0            on success
377  * \retval negative     negated errno on error
378  */
379 static int osp_sync_add_rec(const struct lu_env *env, struct osp_device *d,
380                             const struct lu_fid *fid, llog_op_type type,
381                             int count, struct thandle *th,
382                             const struct lu_attr *attr)
383 {
384         struct osp_thread_info  *osi = osp_env_info(env);
385         struct llog_ctxt        *ctxt;
386         struct osp_txn_info     *txn;
387         struct thandle          *storage_th;
388         int                      rc;
389
390         ENTRY;
391
392         /* it's a layering violation, to access internals of th,
393          * but we can do this as a sanity check, for a while */
394         LASSERT(th->th_top != NULL);
395         storage_th = thandle_get_sub_by_dt(env, th->th_top, d->opd_storage);
396         if (IS_ERR(storage_th))
397                 RETURN(PTR_ERR(storage_th));
398
399         switch (type) {
400         case MDS_UNLINK64_REC:
401                 osi->osi_hdr.lrh_len = sizeof(osi->osi_unlink);
402                 osi->osi_hdr.lrh_type = MDS_UNLINK64_REC;
403                 osi->osi_unlink.lur_fid  = *fid;
404                 osi->osi_unlink.lur_count = count;
405                 break;
406         case MDS_SETATTR64_REC:
407                 rc = fid_to_ostid(fid, &osi->osi_oi);
408                 LASSERT(rc == 0);
409                 osi->osi_hdr.lrh_len = sizeof(osi->osi_setattr);
410                 osi->osi_hdr.lrh_type = MDS_SETATTR64_REC;
411                 osi->osi_setattr.lsr_oi  = osi->osi_oi;
412                 LASSERT(attr);
413                 osi->osi_setattr.lsr_uid = attr->la_uid;
414                 osi->osi_setattr.lsr_gid = attr->la_gid;
415                 osi->osi_setattr.lsr_valid =
416                         ((attr->la_valid & LA_UID) ? OBD_MD_FLUID : 0) |
417                         ((attr->la_valid & LA_GID) ? OBD_MD_FLGID : 0);
418                 break;
419         default:
420                 LBUG();
421         }
422
423         txn = osp_txn_info(&storage_th->th_ctx);
424         LASSERT(txn);
425
426         txn->oti_current_id = osp_sync_id_get(d, txn->oti_current_id);
427         osi->osi_hdr.lrh_id = (txn->oti_current_id & 0xffffffffULL);
428         ctxt = llog_get_context(d->opd_obd, LLOG_MDS_OST_ORIG_CTXT);
429         if (ctxt == NULL)
430                 RETURN(-ENOMEM);
431
432         rc = llog_add(env, ctxt->loc_handle, &osi->osi_hdr, &osi->osi_cookie,
433                       storage_th);
434         llog_ctxt_put(ctxt);
435
436         if (likely(rc >= 0)) {
437                 CDEBUG(D_OTHER, "%s: new record "DOSTID":%lu/%lu: %d\n",
438                        d->opd_obd->obd_name,
439                        POSTID(&osi->osi_cookie.lgc_lgl.lgl_oi),
440                        (unsigned long)osi->osi_cookie.lgc_lgl.lgl_ogen,
441                        (unsigned long)osi->osi_cookie.lgc_index, rc);
442                 atomic_inc(&d->opd_syn_changes);
443         }
444         /* return 0 always here, error case just cause no llog record */
445         RETURN(0);
446 }
447
448 int osp_sync_add(const struct lu_env *env, struct osp_object *o,
449                  llog_op_type type, struct thandle *th,
450                  const struct lu_attr *attr)
451 {
452         return osp_sync_add_rec(env, lu2osp_dev(o->opo_obj.do_lu.lo_dev),
453                                 lu_object_fid(&o->opo_obj.do_lu), type, 1,
454                                 th, attr);
455 }
456
457 int osp_sync_gap(const struct lu_env *env, struct osp_device *d,
458                         struct lu_fid *fid, int lost, struct thandle *th)
459 {
460         return osp_sync_add_rec(env, d, fid, MDS_UNLINK64_REC, lost, th, NULL);
461 }
462
463 /*
464  * it's quite obvious we can't maintain all the structures in the memory:
465  * while OST is down, MDS can be processing thousands and thousands of unlinks
466  * filling persistent llogs and in-core respresentation
467  *
468  * this doesn't scale at all. so we need basically the following:
469  * a) destroy/setattr append llog records
470  * b) once llog has grown to X records, we process first Y committed records
471  *
472  *  once record R is found via llog_process(), it becomes committed after any
473  *  subsequent commit callback (at the most)
474  */
475
476 /**
477  * ptlrpc commit callback.
478  *
479  * The callback is called by PTLRPC when a RPC is reported committed by the
480  * target (OST). We register the callback for the every RPC applying a change
481  * from the llog. This way we know then the llog records can be cancelled.
482  * Notice the callback can be called when OSP is finishing. We can detect this
483  * checking that actual transno in the request is less or equal of known
484  * committed transno (see osp_sync_process_committed() for the details).
485  * XXX: this is pretty expensive and can be improved later using batching.
486  *
487  * \param[in] req       request
488  */
489 static void osp_sync_request_commit_cb(struct ptlrpc_request *req)
490 {
491         struct osp_device *d = req->rq_cb_data;
492         struct osp_job_req_args *jra;
493
494         CDEBUG(D_HA, "commit req %p, transno %llu\n", req, req->rq_transno);
495
496         if (unlikely(req->rq_transno == 0))
497                 return;
498
499         /* do not do any opd_dyn_rpc_* accounting here
500          * it's done in osp_sync_interpret sooner or later */
501         LASSERT(d);
502
503         jra = ptlrpc_req_async_args(req);
504         LASSERT(jra->jra_magic == OSP_JOB_MAGIC);
505         LASSERT(list_empty(&jra->jra_committed_link));
506
507         ptlrpc_request_addref(req);
508
509         spin_lock(&d->opd_syn_lock);
510         list_add(&jra->jra_committed_link, &d->opd_syn_committed_there);
511         spin_unlock(&d->opd_syn_lock);
512
513         /* XXX: some batching wouldn't hurt */
514         wake_up(&d->opd_syn_waitq);
515 }
516
517 /**
518  * RPC interpretation callback.
519  *
520  * The callback is called by ptlrpc when RPC is replied. Now we have to decide
521  * whether we should:
522  *  - put request on a special list to wait until it's committed by the target,
523  *    if the request is successful
524  *  - schedule llog record cancel if no target object is found
525  *  - try later (essentially after reboot) in case of unexpected error
526  *
527  * \param[in] env       LU environment provided by the caller
528  * \param[in] req       request replied
529  * \param[in] aa        callback data
530  * \param[in] rc        result of RPC
531  *
532  * \retval 0            always
533  */
534 static int osp_sync_interpret(const struct lu_env *env,
535                               struct ptlrpc_request *req, void *aa, int rc)
536 {
537         struct osp_device *d = req->rq_cb_data;
538         struct osp_job_req_args *jra = aa;
539
540         if (jra->jra_magic != OSP_JOB_MAGIC) {
541                 DEBUG_REQ(D_ERROR, req, "bad magic %u\n", jra->jra_magic);
542                 LBUG();
543         }
544         LASSERT(d);
545
546         CDEBUG(D_HA, "reply req %p/%d, rc %d, transno %u\n", req,
547                atomic_read(&req->rq_refcount),
548                rc, (unsigned) req->rq_transno);
549         LASSERT(rc || req->rq_transno);
550
551         if (rc == -ENOENT) {
552                 /*
553                  * we tried to destroy object or update attributes,
554                  * but object doesn't exist anymore - cancell llog record
555                  */
556                 LASSERT(req->rq_transno == 0);
557                 LASSERT(list_empty(&jra->jra_committed_link));
558
559                 ptlrpc_request_addref(req);
560
561                 spin_lock(&d->opd_syn_lock);
562                 list_add(&jra->jra_committed_link, &d->opd_syn_committed_there);
563                 spin_unlock(&d->opd_syn_lock);
564
565                 wake_up(&d->opd_syn_waitq);
566         } else if (rc) {
567                 struct obd_import *imp = req->rq_import;
568                 /*
569                  * error happened, we'll try to repeat on next boot ?
570                  */
571                 LASSERTF(req->rq_transno == 0 ||
572                          req->rq_import_generation < imp->imp_generation,
573                          "transno %llu, rc %d, gen: req %d, imp %d\n",
574                          req->rq_transno, rc, req->rq_import_generation,
575                          imp->imp_generation);
576                 if (req->rq_transno == 0) {
577                         /* this is the last time we see the request
578                          * if transno is not zero, then commit cb
579                          * will be called at some point */
580                         LASSERT(atomic_read(&d->opd_syn_rpc_in_progress) > 0);
581                         atomic_dec(&d->opd_syn_rpc_in_progress);
582                 }
583
584                 wake_up(&d->opd_syn_waitq);
585         } else if (d->opd_pre != NULL &&
586                    unlikely(d->opd_pre_status == -ENOSPC)) {
587                 /*
588                  * if current status is -ENOSPC (lack of free space on OST)
589                  * then we should poll OST immediately once object destroy
590                  * is replied
591                  */
592                 osp_statfs_need_now(d);
593         }
594
595         spin_lock(&d->opd_syn_lock);
596         list_del_init(&jra->jra_inflight_link);
597         spin_unlock(&d->opd_syn_lock);
598         LASSERT(atomic_read(&d->opd_syn_rpc_in_flight) > 0);
599         atomic_dec(&d->opd_syn_rpc_in_flight);
600         if (unlikely(atomic_read(&d->opd_syn_barrier) > 0))
601                 wake_up(&d->opd_syn_barrier_waitq);
602         CDEBUG(D_OTHER, "%s: %d in flight, %d in progress\n",
603                d->opd_obd->obd_name, atomic_read(&d->opd_syn_rpc_in_flight),
604                atomic_read(&d->opd_syn_rpc_in_progress));
605
606         osp_sync_check_for_work(d);
607
608         return 0;
609 }
610
611 /*
612  ** Add request to ptlrpc queue.
613  *
614  * This is just a tiny helper function to put the request on the sending list
615  *
616  * \param[in] d         OSP device
617  * \param[in] req       request
618  */
619 static void osp_sync_send_new_rpc(struct osp_device *d,
620                                   struct ptlrpc_request *req)
621 {
622         struct osp_job_req_args *jra;
623
624         LASSERT(atomic_read(&d->opd_syn_rpc_in_flight) <=
625                 d->opd_syn_max_rpc_in_flight);
626
627         jra = ptlrpc_req_async_args(req);
628         jra->jra_magic = OSP_JOB_MAGIC;
629         INIT_LIST_HEAD(&jra->jra_committed_link);
630         spin_lock(&d->opd_syn_lock);
631         list_add_tail(&jra->jra_inflight_link, &d->opd_syn_inflight_list);
632         spin_unlock(&d->opd_syn_lock);
633
634         ptlrpcd_add_req(req);
635 }
636
637
638 /**
639  * Allocate and prepare RPC for a new change.
640  *
641  * The function allocates and initializes an RPC which will be sent soon to
642  * apply the change to the target OST. The request is initialized from the
643  * llog record passed. Notice only the fields common to all type of changes
644  * are initialized.
645  *
646  * \param[in] d         OSP device
647  * \param[in] llh       llog handle where the record is stored
648  * \param[in] h         llog record
649  * \param[in] op        type of the change
650  * \param[in] format    request format to be used
651  *
652  * \retval pointer              new request on success
653  * \retval ERR_PTR(errno)       on error
654  */
655 static struct ptlrpc_request *osp_sync_new_job(struct osp_device *d,
656                                                struct llog_handle *llh,
657                                                struct llog_rec_hdr *h,
658                                                ost_cmd_t op,
659                                                const struct req_format *format)
660 {
661         struct ptlrpc_request   *req;
662         struct ost_body         *body;
663         struct obd_import       *imp;
664         int                      rc;
665
666         /* Prepare the request */
667         imp = d->opd_obd->u.cli.cl_import;
668         LASSERT(imp);
669
670         if (OBD_FAIL_CHECK(OBD_FAIL_OSP_CHECK_ENOMEM))
671                 RETURN(ERR_PTR(-ENOMEM));
672
673         req = ptlrpc_request_alloc(imp, format);
674         if (req == NULL)
675                 RETURN(ERR_PTR(-ENOMEM));
676
677         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, op);
678         if (rc) {
679                 ptlrpc_req_finished(req);
680                 return ERR_PTR(rc);
681         }
682
683         /*
684          * this is a trick: to save on memory allocations we put cookie
685          * into the request, but don't set corresponded flag in o_valid
686          * so that OST doesn't interpret this cookie. once the request
687          * is committed on OST we take cookie from the request and cancel
688          */
689         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
690         LASSERT(body);
691         body->oa.o_lcookie.lgc_lgl = llh->lgh_id;
692         body->oa.o_lcookie.lgc_subsys = LLOG_MDS_OST_ORIG_CTXT;
693         body->oa.o_lcookie.lgc_index = h->lrh_index;
694
695         req->rq_interpret_reply = osp_sync_interpret;
696         req->rq_commit_cb = osp_sync_request_commit_cb;
697         req->rq_cb_data = d;
698
699         ptlrpc_request_set_replen(req);
700
701         return req;
702 }
703
704 /**
705  * Generate a request for setattr change.
706  *
707  * The function prepares a new RPC, initializes it with setattr specific
708  * bits and send the RPC.
709  *
710  * \param[in] d         OSP device
711  * \param[in] llh       llog handle where the record is stored
712  * \param[in] h         llog record
713  *
714  * \retval 0            on success
715  * \retval 1            on invalid record
716  * \retval negative     negated errno on error
717  */
718 static int osp_sync_new_setattr_job(struct osp_device *d,
719                                     struct llog_handle *llh,
720                                     struct llog_rec_hdr *h)
721 {
722         struct llog_setattr64_rec       *rec = (struct llog_setattr64_rec *)h;
723         struct ptlrpc_request           *req;
724         struct ost_body                 *body;
725
726         ENTRY;
727         LASSERT(h->lrh_type == MDS_SETATTR64_REC);
728
729         if (OBD_FAIL_CHECK(OBD_FAIL_OSP_CHECK_INVALID_REC))
730                 RETURN(1);
731         /* lsr_valid can only be 0 or have OBD_MD_{FLUID,FLGID} set,
732          * so no bits other than these should be set. */
733         if ((rec->lsr_valid & ~(OBD_MD_FLUID | OBD_MD_FLGID)) != 0) {
734                 CERROR("%s: invalid setattr record, lsr_valid:%llu\n",
735                        d->opd_obd->obd_name, rec->lsr_valid);
736                 /* return 1 on invalid record */
737                 RETURN(1);
738         }
739
740         req = osp_sync_new_job(d, llh, h, OST_SETATTR, &RQF_OST_SETATTR);
741         if (IS_ERR(req))
742                 RETURN(PTR_ERR(req));
743
744         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
745         LASSERT(body);
746         body->oa.o_oi = rec->lsr_oi;
747         body->oa.o_uid = rec->lsr_uid;
748         body->oa.o_gid = rec->lsr_gid;
749         body->oa.o_valid = OBD_MD_FLGROUP | OBD_MD_FLID;
750         /* old setattr record (prior 2.6.0) doesn't have 'valid' stored,
751          * we assume that both UID and GID are valid in that case. */
752         if (rec->lsr_valid == 0)
753                 body->oa.o_valid |= (OBD_MD_FLUID | OBD_MD_FLGID);
754         else
755                 body->oa.o_valid |= rec->lsr_valid;
756
757         osp_sync_send_new_rpc(d, req);
758         RETURN(0);
759 }
760
761 /**
762  * Generate a request for unlink change.
763  *
764  * The function prepares a new RPC, initializes it with unlink(destroy)
765  * specific bits and sends the RPC. The function is used to handle
766  * llog_unlink_rec which were used in the older versions of Lustre.
767  * Current version uses llog_unlink_rec64.
768  *
769  * \param[in] d         OSP device
770  * \param[in] llh       llog handle where the record is stored
771  * \param[in] h         llog record
772  *
773  * \retval 0            on success
774  * \retval negative     negated errno on error
775  */
776 static int osp_sync_new_unlink_job(struct osp_device *d,
777                                    struct llog_handle *llh,
778                                    struct llog_rec_hdr *h)
779 {
780         struct llog_unlink_rec  *rec = (struct llog_unlink_rec *)h;
781         struct ptlrpc_request   *req;
782         struct ost_body         *body;
783
784         ENTRY;
785         LASSERT(h->lrh_type == MDS_UNLINK_REC);
786
787         req = osp_sync_new_job(d, llh, h, OST_DESTROY, &RQF_OST_DESTROY);
788         if (IS_ERR(req))
789                 RETURN(PTR_ERR(req));
790
791         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
792         LASSERT(body);
793         ostid_set_seq(&body->oa.o_oi, rec->lur_oseq);
794         ostid_set_id(&body->oa.o_oi, rec->lur_oid);
795         body->oa.o_misc = rec->lur_count;
796         body->oa.o_valid = OBD_MD_FLGROUP | OBD_MD_FLID;
797         if (rec->lur_count)
798                 body->oa.o_valid |= OBD_MD_FLOBJCOUNT;
799
800         osp_sync_send_new_rpc(d, req);
801         RETURN(0);
802 }
803
804 /**
805  * Generate a request for unlink change.
806  *
807  * The function prepares a new RPC, initializes it with unlink(destroy)
808  * specific bits and sends the RPC. Depending on the target (MDT or OST)
809  * two different protocols are used. For MDT we use OUT (basically OSD API
810  * updates transferred via a network). For OST we still use the old
811  * protocol (OBD?), originally for compatibility. Later we can start to
812  * use OUT for OST as well, this will allow batching and better code
813  * unification.
814  *
815  * \param[in] d         OSP device
816  * \param[in] llh       llog handle where the record is stored
817  * \param[in] h         llog record
818  *
819  * \retval 0            on success
820  * \retval negative     negated errno on error
821  */
822 static int osp_sync_new_unlink64_job(struct osp_device *d,
823                                      struct llog_handle *llh,
824                                      struct llog_rec_hdr *h)
825 {
826         struct llog_unlink64_rec        *rec = (struct llog_unlink64_rec *)h;
827         struct ptlrpc_request           *req = NULL;
828         struct ost_body                 *body;
829         int                              rc;
830
831         ENTRY;
832         LASSERT(h->lrh_type == MDS_UNLINK64_REC);
833         req = osp_sync_new_job(d, llh, h, OST_DESTROY,
834                                &RQF_OST_DESTROY);
835         if (IS_ERR(req))
836                 RETURN(PTR_ERR(req));
837
838         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
839         if (body == NULL)
840                 RETURN(-EFAULT);
841         rc = fid_to_ostid(&rec->lur_fid, &body->oa.o_oi);
842         if (rc < 0)
843                 RETURN(rc);
844         body->oa.o_misc = rec->lur_count;
845         body->oa.o_valid = OBD_MD_FLGROUP | OBD_MD_FLID |
846                            OBD_MD_FLOBJCOUNT;
847         osp_sync_send_new_rpc(d, req);
848         RETURN(0);
849 }
850
851 /**
852  * Process llog records.
853  *
854  * This function is called to process the llog records committed locally.
855  * In the recovery model used by OSP we can apply a change to a remote
856  * target once corresponding transaction (like posix unlink) is committed
857  * locally so can't revert.
858  * Depending on the llog record type, a given handler is called that is
859  * responsible for preparing and sending the RPC to apply the change.
860  * Special record type LLOG_GEN_REC marking a reboot is cancelled right away.
861  *
862  * \param[in] env       LU environment provided by the caller
863  * \param[in] d         OSP device
864  * \param[in] llh       llog handle where the record is stored
865  * \param[in] rec       llog record
866  */
867 static void osp_sync_process_record(const struct lu_env *env,
868                                     struct osp_device *d,
869                                     struct llog_handle *llh,
870                                     struct llog_rec_hdr *rec)
871 {
872         struct llog_handle      *cathandle = llh->u.phd.phd_cat_handle;
873         struct llog_cookie       cookie;
874         int                      rc = 0;
875
876         ENTRY;
877
878         cookie.lgc_lgl = llh->lgh_id;
879         cookie.lgc_subsys = LLOG_MDS_OST_ORIG_CTXT;
880         cookie.lgc_index = rec->lrh_index;
881
882         if (unlikely(rec->lrh_type == LLOG_GEN_REC)) {
883                 struct llog_gen_rec *gen = (struct llog_gen_rec *)rec;
884
885                 /* we're waiting for the record generated by this instance */
886                 LASSERT(d->opd_syn_prev_done == 0);
887                 if (!memcmp(&d->opd_syn_generation, &gen->lgr_gen,
888                             sizeof(gen->lgr_gen))) {
889                         CDEBUG(D_HA, "processed all old entries\n");
890                         d->opd_syn_prev_done = 1;
891                 }
892
893                 /* cancel any generation record */
894                 rc = llog_cat_cancel_records(env, cathandle, 1, &cookie);
895
896                 RETURN_EXIT;
897         }
898
899         /*
900          * now we prepare and fill requests to OST, put them on the queue
901          * and fire after next commit callback
902          */
903
904         /* notice we increment counters before sending RPC, to be consistent
905          * in RPC interpret callback which may happen very quickly */
906         atomic_inc(&d->opd_syn_rpc_in_flight);
907         atomic_inc(&d->opd_syn_rpc_in_progress);
908
909         switch (rec->lrh_type) {
910         /* case MDS_UNLINK_REC is kept for compatibility */
911         case MDS_UNLINK_REC:
912                 rc = osp_sync_new_unlink_job(d, llh, rec);
913                 break;
914         case MDS_UNLINK64_REC:
915                 rc = osp_sync_new_unlink64_job(d, llh, rec);
916                 break;
917         case MDS_SETATTR64_REC:
918                 rc = osp_sync_new_setattr_job(d, llh, rec);
919                 break;
920         default:
921                 CERROR("%s: unknown record type: %x\n", d->opd_obd->obd_name,
922                        rec->lrh_type);
923                 /* treat "unknown record type" as "invalid" */
924                 rc = 1;
925                 break;
926         }
927
928         /* For all kinds of records, not matter successful or not,
929          * we should decrease changes and bump last_processed_id.
930          */
931         if (d->opd_syn_prev_done) {
932                 __u64 correct_id = osp_sync_correct_id(d, rec);
933                 LASSERT(atomic_read(&d->opd_syn_changes) > 0);
934                 LASSERT(correct_id <= d->opd_syn_last_committed_id);
935                 /* NOTE: it's possible to meet same id if
936                  * OST stores few stripes of same file
937                  */
938                 while (1) {
939                         /* another thread may be trying to set new value */
940                         rmb();
941                         if (correct_id > d->opd_syn_last_processed_id) {
942                                 d->opd_syn_last_processed_id = correct_id;
943                                 wake_up(&d->opd_syn_barrier_waitq);
944                         } else
945                                 break;
946                 }
947                 atomic_dec(&d->opd_syn_changes);
948         }
949         if (rc != 0) {
950                 atomic_dec(&d->opd_syn_rpc_in_flight);
951                 atomic_dec(&d->opd_syn_rpc_in_progress);
952         }
953
954         CDEBUG(D_OTHER, "%s: %d in flight, %d in progress\n",
955                d->opd_obd->obd_name, atomic_read(&d->opd_syn_rpc_in_flight),
956                atomic_read(&d->opd_syn_rpc_in_progress));
957
958         /* Delete the invalid record */
959         if (rc == 1) {
960                 rc = llog_cat_cancel_records(env, cathandle, 1, &cookie);
961                 if (rc != 0)
962                         CERROR("%s: can't delete invalid record: "
963                                "fid = "DFID", rec_id = %u, rc = %d\n",
964                                d->opd_obd->obd_name,
965                                PFID(lu_object_fid(&cathandle->lgh_obj->do_lu)),
966                                rec->lrh_id, rc);
967         }
968
969         CDEBUG(D_HA, "found record %x, %d, idx %u, id %u\n",
970                rec->lrh_type, rec->lrh_len, rec->lrh_index, rec->lrh_id);
971
972         RETURN_EXIT;
973 }
974
975 /**
976  * Cancel llog records for the committed changes.
977  *
978  * The function walks through the list of the committed RPCs and cancels
979  * corresponding llog records. see osp_sync_request_commit_cb() for the
980  * details.
981  *
982  * \param[in] env       LU environment provided by the caller
983  * \param[in] d         OSP device
984  */
985 static void osp_sync_process_committed(const struct lu_env *env,
986                                        struct osp_device *d)
987 {
988         struct obd_device       *obd = d->opd_obd;
989         struct obd_import       *imp = obd->u.cli.cl_import;
990         struct ost_body         *body;
991         struct ptlrpc_request   *req;
992         struct llog_ctxt        *ctxt;
993         struct llog_handle      *llh;
994         struct list_head         list;
995         int                      rc, done = 0;
996
997         ENTRY;
998
999         if (list_empty(&d->opd_syn_committed_there))
1000                 return;
1001
1002         /*
1003          * if current status is -ENOSPC (lack of free space on OST)
1004          * then we should poll OST immediately once object destroy
1005          * is committed.
1006          * notice: we do this upon commit as well because some backends
1007          * (like DMU) do not release space right away.
1008          */
1009         if (d->opd_pre != NULL && unlikely(d->opd_pre_status == -ENOSPC))
1010                 osp_statfs_need_now(d);
1011
1012         /*
1013          * now cancel them all
1014          * XXX: can we improve this using some batching?
1015          *      with batch RPC that'll happen automatically?
1016          * XXX: can we store ctxt in lod_device and save few cycles ?
1017          */
1018         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
1019         LASSERT(ctxt);
1020
1021         llh = ctxt->loc_handle;
1022         LASSERT(llh);
1023
1024         INIT_LIST_HEAD(&list);
1025         spin_lock(&d->opd_syn_lock);
1026         list_splice(&d->opd_syn_committed_there, &list);
1027         INIT_LIST_HEAD(&d->opd_syn_committed_there);
1028         spin_unlock(&d->opd_syn_lock);
1029
1030         while (!list_empty(&list)) {
1031                 struct osp_job_req_args *jra;
1032
1033                 jra = list_entry(list.next, struct osp_job_req_args,
1034                                  jra_committed_link);
1035                 LASSERT(jra->jra_magic == OSP_JOB_MAGIC);
1036                 list_del_init(&jra->jra_committed_link);
1037
1038                 req = container_of((void *)jra, struct ptlrpc_request,
1039                                    rq_async_args);
1040                 body = req_capsule_client_get(&req->rq_pill,
1041                                               &RMF_OST_BODY);
1042                 LASSERT(body);
1043                 /* import can be closing, thus all commit cb's are
1044                  * called we can check committness directly */
1045                 if (req->rq_import_generation == imp->imp_generation) {
1046                         rc = llog_cat_cancel_records(env, llh, 1,
1047                                                      &body->oa.o_lcookie);
1048                         if (rc)
1049                                 CERROR("%s: can't cancel record: %d\n",
1050                                        obd->obd_name, rc);
1051                 } else {
1052                         DEBUG_REQ(D_OTHER, req, "imp_committed = %llu",
1053                                   imp->imp_peer_committed_transno);
1054                 }
1055                 ptlrpc_req_finished(req);
1056                 done++;
1057         }
1058
1059         llog_ctxt_put(ctxt);
1060
1061         LASSERT(atomic_read(&d->opd_syn_rpc_in_progress) >= done);
1062         atomic_sub(done, &d->opd_syn_rpc_in_progress);
1063         CDEBUG(D_OTHER, "%s: %d in flight, %d in progress\n",
1064                d->opd_obd->obd_name, atomic_read(&d->opd_syn_rpc_in_flight),
1065                atomic_read(&d->opd_syn_rpc_in_progress));
1066
1067         osp_sync_check_for_work(d);
1068
1069         /* wake up the thread if requested to stop:
1070          * it might be waiting for in-progress to complete */
1071         if (unlikely(osp_sync_running(d) == 0))
1072                 wake_up(&d->opd_syn_waitq);
1073
1074         EXIT;
1075 }
1076
1077 /**
1078  * The core of the syncing mechanism.
1079  *
1080  * This is a callback called by the llog processing function. Essentially it
1081  * suspends llog processing until there is a record to process (it's supposed
1082  * to be committed locally). The function handles RPCs committed by the target
1083  * and cancels corresponding llog records.
1084  *
1085  * \param[in] env       LU environment provided by the caller
1086  * \param[in] llh       llog handle we're processing
1087  * \param[in] rec       current llog record
1088  * \param[in] data      callback data containing a pointer to the device
1089  *
1090  * \retval 0                    to ask the caller (llog_process()) to continue
1091  * \retval LLOG_PROC_BREAK      to ask the caller to break
1092  */
1093 static int osp_sync_process_queues(const struct lu_env *env,
1094                                    struct llog_handle *llh,
1095                                    struct llog_rec_hdr *rec,
1096                                    void *data)
1097 {
1098         struct osp_device       *d = data;
1099
1100         do {
1101                 struct l_wait_info lwi = { 0 };
1102
1103                 if (!osp_sync_running(d)) {
1104                         CDEBUG(D_HA, "stop llog processing\n");
1105                         return LLOG_PROC_BREAK;
1106                 }
1107
1108                 /* process requests committed by OST */
1109                 osp_sync_process_committed(env, d);
1110
1111                 /* if we there are changes to be processed and we have
1112                  * resources for this ... do now */
1113                 if (osp_sync_can_process_new(d, rec)) {
1114                         if (llh == NULL) {
1115                                 /* ask llog for another record */
1116                                 CDEBUG(D_HA, "%u changes, %u in progress,"
1117                                        " %u in flight\n",
1118                                        atomic_read(&d->opd_syn_changes),
1119                                        atomic_read(&d->opd_syn_rpc_in_progress),
1120                                        atomic_read(&d->opd_syn_rpc_in_flight));
1121                                 return 0;
1122                         }
1123                         osp_sync_process_record(env, d, llh, rec);
1124                         llh = NULL;
1125                         rec = NULL;
1126                 }
1127
1128                 if (d->opd_syn_last_processed_id == d->opd_syn_last_used_id)
1129                         osp_sync_remove_from_tracker(d);
1130
1131                 l_wait_event(d->opd_syn_waitq,
1132                              !osp_sync_running(d) ||
1133                              osp_sync_can_process_new(d, rec) ||
1134                              !list_empty(&d->opd_syn_committed_there),
1135                              &lwi);
1136         } while (1);
1137 }
1138
1139 /**
1140  * OSP sync thread.
1141  *
1142  * This thread runs llog_cat_process() scanner calling our callback
1143  * to process llog records. in the callback we implement tricky
1144  * state machine as we don't want to start scanning of the llog again
1145  * and again, also we don't want to process too many records and send
1146  * too many RPCs a time. so, depending on current load (num of changes
1147  * being synced to OST) the callback can suspend awaiting for some
1148  * new conditions, like syncs completed.
1149  *
1150  * In order to process llog records left by previous boots and to allow
1151  * llog_process_thread() to find something (otherwise it'd just exit
1152  * immediately) we add a special GENERATATION record on each boot.
1153  *
1154  * \param[in] _arg      a pointer to thread's arguments
1155  *
1156  * \retval 0            on success
1157  * \retval negative     negated errno on error
1158  */
1159 static int osp_sync_thread(void *_arg)
1160 {
1161         struct osp_device       *d = _arg;
1162         struct ptlrpc_thread    *thread = &d->opd_syn_thread;
1163         struct l_wait_info       lwi = { 0 };
1164         struct llog_ctxt        *ctxt;
1165         struct obd_device       *obd = d->opd_obd;
1166         struct llog_handle      *llh;
1167         struct lu_env            env;
1168         int                      rc, count;
1169
1170         ENTRY;
1171
1172         rc = lu_env_init(&env, LCT_LOCAL);
1173         if (rc) {
1174                 CERROR("%s: can't initialize env: rc = %d\n",
1175                        obd->obd_name, rc);
1176                 RETURN(rc);
1177         }
1178
1179         spin_lock(&d->opd_syn_lock);
1180         thread->t_flags = SVC_RUNNING;
1181         spin_unlock(&d->opd_syn_lock);
1182         wake_up(&thread->t_ctl_waitq);
1183
1184         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
1185         if (ctxt == NULL) {
1186                 CERROR("can't get appropriate context\n");
1187                 GOTO(out, rc = -EINVAL);
1188         }
1189
1190         llh = ctxt->loc_handle;
1191         if (llh == NULL) {
1192                 CERROR("can't get llh\n");
1193                 llog_ctxt_put(ctxt);
1194                 GOTO(out, rc = -EINVAL);
1195         }
1196
1197         rc = llog_cat_process(&env, llh, osp_sync_process_queues, d, 0, 0);
1198         if (rc < 0) {
1199                 CERROR("%s: llog process with osp_sync_process_queues "
1200                        "failed: %d\n", d->opd_obd->obd_name, rc);
1201                 GOTO(close, rc);
1202         }
1203         LASSERTF(rc == 0 || rc == LLOG_PROC_BREAK,
1204                  "%u changes, %u in progress, %u in flight: %d\n",
1205                  atomic_read(&d->opd_syn_changes),
1206                  atomic_read(&d->opd_syn_rpc_in_progress),
1207                  atomic_read(&d->opd_syn_rpc_in_flight), rc);
1208
1209         /* we don't expect llog_process_thread() to exit till umount */
1210         LASSERTF(thread->t_flags != SVC_RUNNING,
1211                  "%u changes, %u in progress, %u in flight\n",
1212                  atomic_read(&d->opd_syn_changes),
1213                  atomic_read(&d->opd_syn_rpc_in_progress),
1214                  atomic_read(&d->opd_syn_rpc_in_flight));
1215
1216         /* wait till all the requests are completed */
1217         count = 0;
1218         while (atomic_read(&d->opd_syn_rpc_in_progress) > 0) {
1219                 osp_sync_process_committed(&env, d);
1220
1221                 lwi = LWI_TIMEOUT(cfs_time_seconds(5), NULL, NULL);
1222                 rc = l_wait_event(d->opd_syn_waitq,
1223                                   atomic_read(&d->opd_syn_rpc_in_progress) == 0,
1224                                   &lwi);
1225                 if (rc == -ETIMEDOUT)
1226                         count++;
1227                 LASSERTF(count < 10, "%s: %d %d %sempty\n",
1228                          d->opd_obd->obd_name,
1229                          atomic_read(&d->opd_syn_rpc_in_progress),
1230                          atomic_read(&d->opd_syn_rpc_in_flight),
1231                          list_empty(&d->opd_syn_committed_there) ? "" : "!");
1232
1233         }
1234
1235 close:
1236         llog_cat_close(&env, llh);
1237         rc = llog_cleanup(&env, ctxt);
1238         if (rc)
1239                 CERROR("can't cleanup llog: %d\n", rc);
1240 out:
1241         LASSERTF(atomic_read(&d->opd_syn_rpc_in_progress) == 0,
1242                  "%s: %d %d %sempty\n",
1243                  d->opd_obd->obd_name, atomic_read(&d->opd_syn_rpc_in_progress),
1244                  atomic_read(&d->opd_syn_rpc_in_flight),
1245                  list_empty(&d->opd_syn_committed_there) ? "" : "!");
1246
1247         thread->t_flags = SVC_STOPPED;
1248
1249         wake_up(&thread->t_ctl_waitq);
1250
1251         lu_env_fini(&env);
1252
1253         RETURN(0);
1254 }
1255
1256 /**
1257  * Initialize llog.
1258  *
1259  * Initializes the llog. Specific llog to be used depends on the type of the
1260  * target OSP represents (OST or MDT). The function adds appends a new llog
1261  * record to mark the place where the records associated with this boot
1262  * start.
1263  *
1264  * \param[in] env       LU environment provided by the caller
1265  * \param[in] d         OSP device
1266  *
1267  * \retval 0            on success
1268  * \retval negative     negated errno on error
1269  */
1270 static int osp_sync_llog_init(const struct lu_env *env, struct osp_device *d)
1271 {
1272         struct osp_thread_info  *osi = osp_env_info(env);
1273         struct lu_fid           *fid = &osi->osi_fid;
1274         struct llog_handle      *lgh = NULL;
1275         struct obd_device       *obd = d->opd_obd;
1276         struct llog_ctxt        *ctxt;
1277         int                     rc;
1278
1279         ENTRY;
1280
1281         LASSERT(obd);
1282
1283         /*
1284          * open llog corresponding to our OST
1285          */
1286         OBD_SET_CTXT_MAGIC(&obd->obd_lvfs_ctxt);
1287         obd->obd_lvfs_ctxt.dt = d->opd_storage;
1288
1289         lu_local_obj_fid(fid, LLOG_CATALOGS_OID);
1290
1291         rc = llog_osd_get_cat_list(env, d->opd_storage, d->opd_index, 1,
1292                                    &osi->osi_cid, fid);
1293         if (rc < 0) {
1294                 if (rc != -EFAULT) {
1295                         CERROR("%s: can't get id from catalogs: rc = %d\n",
1296                                obd->obd_name, rc);
1297                         RETURN(rc);
1298                 }
1299
1300                 /* After sparse OST indices is supported, the CATALOG file
1301                  * may become a sparse file that results in failure on
1302                  * reading. Skip this error as the llog will be created
1303                  * later */
1304                 memset(&osi->osi_cid, 0, sizeof(osi->osi_cid));
1305                 rc = 0;
1306         }
1307
1308         CDEBUG(D_INFO, "%s: Init llog for %d - catid "DOSTID":%x\n",
1309                obd->obd_name, d->opd_index,
1310                POSTID(&osi->osi_cid.lci_logid.lgl_oi),
1311                osi->osi_cid.lci_logid.lgl_ogen);
1312
1313         rc = llog_setup(env, obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT,
1314                         d->opd_storage->dd_lu_dev.ld_obd,
1315                         &osp_mds_ost_orig_logops);
1316         if (rc)
1317                 RETURN(rc);
1318
1319         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
1320         LASSERT(ctxt);
1321
1322         if (likely(logid_id(&osi->osi_cid.lci_logid) != 0)) {
1323                 rc = llog_open(env, ctxt, &lgh, &osi->osi_cid.lci_logid, NULL,
1324                                LLOG_OPEN_EXISTS);
1325                 /* re-create llog if it is missing */
1326                 if (rc == -ENOENT)
1327                         logid_set_id(&osi->osi_cid.lci_logid, 0);
1328                 else if (rc < 0)
1329                         GOTO(out_cleanup, rc);
1330         }
1331
1332         if (unlikely(logid_id(&osi->osi_cid.lci_logid) == 0)) {
1333                 rc = llog_open_create(env, ctxt, &lgh, NULL, NULL);
1334                 if (rc < 0)
1335                         GOTO(out_cleanup, rc);
1336                 osi->osi_cid.lci_logid = lgh->lgh_id;
1337         }
1338
1339         LASSERT(lgh != NULL);
1340         ctxt->loc_handle = lgh;
1341
1342         rc = llog_init_handle(env, lgh, LLOG_F_IS_CAT, NULL);
1343         if (rc)
1344                 GOTO(out_close, rc);
1345
1346         rc = llog_osd_put_cat_list(env, d->opd_storage, d->opd_index, 1,
1347                                    &osi->osi_cid, fid);
1348         if (rc)
1349                 GOTO(out_close, rc);
1350
1351         /*
1352          * put a mark in the llog till which we'll be processing
1353          * old records restless
1354          */
1355         d->opd_syn_generation.mnt_cnt = cfs_time_current();
1356         d->opd_syn_generation.conn_cnt = cfs_time_current();
1357
1358         osi->osi_hdr.lrh_type = LLOG_GEN_REC;
1359         osi->osi_hdr.lrh_len = sizeof(osi->osi_gen);
1360
1361         memcpy(&osi->osi_gen.lgr_gen, &d->opd_syn_generation,
1362                sizeof(osi->osi_gen.lgr_gen));
1363
1364         rc = llog_cat_add(env, lgh, &osi->osi_gen.lgr_hdr, &osi->osi_cookie);
1365         if (rc < 0)
1366                 GOTO(out_close, rc);
1367         llog_ctxt_put(ctxt);
1368         RETURN(0);
1369 out_close:
1370         llog_cat_close(env, lgh);
1371 out_cleanup:
1372         llog_cleanup(env, ctxt);
1373         RETURN(rc);
1374 }
1375
1376 /**
1377  * Cleanup llog used for syncing.
1378  *
1379  * Closes and cleanups the llog. The function is called when the device is
1380  * shutting down.
1381  *
1382  * \param[in] env       LU environment provided by the caller
1383  * \param[in] d         OSP device
1384  */
1385 static void osp_sync_llog_fini(const struct lu_env *env, struct osp_device *d)
1386 {
1387         struct llog_ctxt *ctxt;
1388
1389         ctxt = llog_get_context(d->opd_obd, LLOG_MDS_OST_ORIG_CTXT);
1390         if (ctxt != NULL)
1391                 llog_cat_close(env, ctxt->loc_handle);
1392         llog_cleanup(env, ctxt);
1393 }
1394
1395 /**
1396  * Initialization of the sync component of OSP.
1397  *
1398  * Initializes the llog and starts a new thread to handle the changes to
1399  * the remote target (OST or MDT).
1400  *
1401  * \param[in] env       LU environment provided by the caller
1402  * \param[in] d         OSP device
1403  *
1404  * \retval 0            on success
1405  * \retval negative     negated errno on error
1406  */
1407 int osp_sync_init(const struct lu_env *env, struct osp_device *d)
1408 {
1409         struct l_wait_info       lwi = { 0 };
1410         struct task_struct      *task;
1411         int                      rc;
1412
1413         ENTRY;
1414
1415         rc = osp_sync_id_traction_init(d);
1416         if (rc)
1417                 RETURN(rc);
1418
1419         /*
1420          * initialize llog storing changes
1421          */
1422         rc = osp_sync_llog_init(env, d);
1423         if (rc) {
1424                 CERROR("%s: can't initialize llog: rc = %d\n",
1425                        d->opd_obd->obd_name, rc);
1426                 GOTO(err_id, rc);
1427         }
1428
1429         /*
1430          * Start synchronization thread
1431          */
1432         d->opd_syn_max_rpc_in_flight = OSP_MAX_IN_FLIGHT;
1433         d->opd_syn_max_rpc_in_progress = OSP_MAX_IN_PROGRESS;
1434         spin_lock_init(&d->opd_syn_lock);
1435         init_waitqueue_head(&d->opd_syn_waitq);
1436         init_waitqueue_head(&d->opd_syn_barrier_waitq);
1437         init_waitqueue_head(&d->opd_syn_thread.t_ctl_waitq);
1438         INIT_LIST_HEAD(&d->opd_syn_inflight_list);
1439         INIT_LIST_HEAD(&d->opd_syn_committed_there);
1440
1441         task = kthread_run(osp_sync_thread, d, "osp-syn-%u-%u",
1442                            d->opd_index, d->opd_group);
1443         if (IS_ERR(task)) {
1444                 rc = PTR_ERR(task);
1445                 CERROR("%s: cannot start sync thread: rc = %d\n",
1446                        d->opd_obd->obd_name, rc);
1447                 GOTO(err_llog, rc);
1448         }
1449
1450         l_wait_event(d->opd_syn_thread.t_ctl_waitq,
1451                      osp_sync_running(d) || osp_sync_stopped(d), &lwi);
1452
1453         RETURN(0);
1454 err_llog:
1455         osp_sync_llog_fini(env, d);
1456 err_id:
1457         osp_sync_id_traction_fini(d);
1458         return rc;
1459 }
1460
1461 /**
1462  * Stop the syncing thread.
1463  *
1464  * Asks the syncing thread to stop and wait until it's stopped.
1465  *
1466  * \param[in] d         OSP device
1467  *
1468  * \retval              0
1469  */
1470 int osp_sync_fini(struct osp_device *d)
1471 {
1472         struct ptlrpc_thread *thread = &d->opd_syn_thread;
1473
1474         ENTRY;
1475
1476         thread->t_flags = SVC_STOPPING;
1477         wake_up(&d->opd_syn_waitq);
1478         wait_event(thread->t_ctl_waitq, thread->t_flags & SVC_STOPPED);
1479
1480         /*
1481          * unregister transaction callbacks only when sync thread
1482          * has finished operations with llog
1483          */
1484         osp_sync_id_traction_fini(d);
1485
1486         RETURN(0);
1487 }
1488
1489 static DEFINE_MUTEX(osp_id_tracker_sem);
1490 static struct list_head osp_id_tracker_list =
1491                 LIST_HEAD_INIT(osp_id_tracker_list);
1492
1493 /**
1494  * OSD commit callback.
1495  *
1496  * The function is used as a local OSD commit callback to track the highest
1497  * committed llog record id. see osp_sync_id_traction_init() for the details.
1498  *
1499  * \param[in] th        local transaction handle committed
1500  * \param[in] cookie    commit callback data (our private structure)
1501  */
1502 static void osp_sync_tracker_commit_cb(struct thandle *th, void *cookie)
1503 {
1504         struct osp_id_tracker   *tr = cookie;
1505         struct osp_device       *d;
1506         struct osp_txn_info     *txn;
1507
1508         LASSERT(tr);
1509
1510         txn = osp_txn_info(&th->th_ctx);
1511         if (txn == NULL || txn->oti_current_id < tr->otr_committed_id)
1512                 return;
1513
1514         spin_lock(&tr->otr_lock);
1515         if (likely(txn->oti_current_id > tr->otr_committed_id)) {
1516                 CDEBUG(D_OTHER, "committed: %llu -> %llu\n",
1517                        tr->otr_committed_id, txn->oti_current_id);
1518                 tr->otr_committed_id = txn->oti_current_id;
1519
1520                 list_for_each_entry(d, &tr->otr_wakeup_list,
1521                                     opd_syn_ontrack) {
1522                         d->opd_syn_last_committed_id = tr->otr_committed_id;
1523                         wake_up(&d->opd_syn_waitq);
1524                 }
1525         }
1526         spin_unlock(&tr->otr_lock);
1527 }
1528
1529 /**
1530  * Initialize commit tracking mechanism.
1531  *
1532  * Some setups may have thousands of OSTs and each will be represented by OSP.
1533  * Meaning order of magnitute many more changes to apply every second. In order
1534  * to keep the number of commit callbacks low this mechanism was introduced.
1535  * The mechanism is very similar to transno used by MDT service: it's an single
1536  * ID stream which can be assigned by any OSP to its llog records. The tricky
1537  * part is that ID is stored in per-transaction data and re-used by all the OSPs
1538  * involved in that transaction. Then all these OSPs are woken up utilizing a single OSD commit callback.
1539  *
1540  * The function initializes the data used by the tracker described above.
1541  * A singler tracker per OSD device is created.
1542  *
1543  * \param[in] d         OSP device
1544  *
1545  * \retval 0            on success
1546  * \retval negative     negated errno on error
1547  */
1548 static int osp_sync_id_traction_init(struct osp_device *d)
1549 {
1550         struct osp_id_tracker   *tr, *found = NULL;
1551         int                      rc = 0;
1552
1553         LASSERT(d);
1554         LASSERT(d->opd_storage);
1555         LASSERT(d->opd_syn_tracker == NULL);
1556         INIT_LIST_HEAD(&d->opd_syn_ontrack);
1557
1558         mutex_lock(&osp_id_tracker_sem);
1559         list_for_each_entry(tr, &osp_id_tracker_list, otr_list) {
1560                 if (tr->otr_dev == d->opd_storage) {
1561                         LASSERT(atomic_read(&tr->otr_refcount));
1562                         atomic_inc(&tr->otr_refcount);
1563                         d->opd_syn_tracker = tr;
1564                         found = tr;
1565                         break;
1566                 }
1567         }
1568
1569         if (found == NULL) {
1570                 rc = -ENOMEM;
1571                 OBD_ALLOC_PTR(tr);
1572                 if (tr) {
1573                         d->opd_syn_tracker = tr;
1574                         spin_lock_init(&tr->otr_lock);
1575                         tr->otr_dev = d->opd_storage;
1576                         tr->otr_next_id = 1;
1577                         tr->otr_committed_id = 0;
1578                         atomic_set(&tr->otr_refcount, 1);
1579                         INIT_LIST_HEAD(&tr->otr_wakeup_list);
1580                         list_add(&tr->otr_list, &osp_id_tracker_list);
1581                         tr->otr_tx_cb.dtc_txn_commit =
1582                                                 osp_sync_tracker_commit_cb;
1583                         tr->otr_tx_cb.dtc_cookie = tr;
1584                         tr->otr_tx_cb.dtc_tag = LCT_MD_THREAD;
1585                         dt_txn_callback_add(d->opd_storage, &tr->otr_tx_cb);
1586                         rc = 0;
1587                 }
1588         }
1589         mutex_unlock(&osp_id_tracker_sem);
1590
1591         return rc;
1592 }
1593
1594 /**
1595  * Release commit tracker.
1596  *
1597  * Decrease a refcounter on the tracker used by the given OSP device \a d.
1598  * If no more users left, then the tracker is released.
1599  *
1600  * \param[in] d         OSP device
1601  */
1602 static void osp_sync_id_traction_fini(struct osp_device *d)
1603 {
1604         struct osp_id_tracker *tr;
1605
1606         ENTRY;
1607
1608         LASSERT(d);
1609         tr = d->opd_syn_tracker;
1610         if (tr == NULL) {
1611                 EXIT;
1612                 return;
1613         }
1614
1615         osp_sync_remove_from_tracker(d);
1616
1617         mutex_lock(&osp_id_tracker_sem);
1618         if (atomic_dec_and_test(&tr->otr_refcount)) {
1619                 dt_txn_callback_del(d->opd_storage, &tr->otr_tx_cb);
1620                 LASSERT(list_empty(&tr->otr_wakeup_list));
1621                 list_del(&tr->otr_list);
1622                 OBD_FREE_PTR(tr);
1623                 d->opd_syn_tracker = NULL;
1624         }
1625         mutex_unlock(&osp_id_tracker_sem);
1626
1627         EXIT;
1628 }
1629
1630 /**
1631  * Generate a new ID on a tracker.
1632  *
1633  * Generates a new ID using the tracker associated with the given OSP device
1634  * \a d, if the given ID \a id is non-zero. Unconditially adds OSP device to
1635  * the wakeup list, so OSP won't miss when a transaction using the ID is
1636  * committed.
1637  *
1638  * \param[in] d         OSP device
1639  * \param[in] id        0 or ID generated previously
1640  *
1641  * \retval              ID the caller should use
1642  */
1643 static __u64 osp_sync_id_get(struct osp_device *d, __u64 id)
1644 {
1645         struct osp_id_tracker *tr;
1646
1647         tr = d->opd_syn_tracker;
1648         LASSERT(tr);
1649
1650         /* XXX: we can improve this introducing per-cpu preallocated ids? */
1651         spin_lock(&tr->otr_lock);
1652         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_TRACK_OVERFLOW))
1653                 tr->otr_next_id = 0xfffffff0;
1654
1655         if (unlikely(tr->otr_next_id <= d->opd_syn_last_used_id)) {
1656                 spin_unlock(&tr->otr_lock);
1657                 CERROR("%s: next %llu, last synced %llu\n",
1658                        d->opd_obd->obd_name, tr->otr_next_id,
1659                        d->opd_syn_last_used_id);
1660                 LBUG();
1661         }
1662
1663         if (id == 0)
1664                 id = tr->otr_next_id++;
1665         if (id > d->opd_syn_last_used_id)
1666                 d->opd_syn_last_used_id = id;
1667         if (list_empty(&d->opd_syn_ontrack))
1668                 list_add(&d->opd_syn_ontrack, &tr->otr_wakeup_list);
1669         spin_unlock(&tr->otr_lock);
1670         CDEBUG(D_OTHER, "new id %llu\n", id);
1671
1672         return id;
1673 }
1674
1675 /**
1676  * Stop to propagate commit status to OSP.
1677  *
1678  * If the OSP does not have any llog records she's waiting to commit, then
1679  * it is possible to unsubscribe from wakeups from the tracking using this
1680  * method.
1681  *
1682  * \param[in] d         OSP device not willing to wakeup
1683  */
1684 static void osp_sync_remove_from_tracker(struct osp_device *d)
1685 {
1686         struct osp_id_tracker *tr;
1687
1688         tr = d->opd_syn_tracker;
1689         LASSERT(tr);
1690
1691         if (list_empty(&d->opd_syn_ontrack))
1692                 return;
1693
1694         spin_lock(&tr->otr_lock);
1695         list_del_init(&d->opd_syn_ontrack);
1696         spin_unlock(&tr->otr_lock);
1697 }
1698