{-# LANGUAGE TypeFamilies #-}
module Math.HiddenMarkovModel.Normalized where
import qualified Math.HiddenMarkovModel.Distribution as Distr
import Math.HiddenMarkovModel.Private
(T(..), Trained(..), emission,
biscaleTransition, matrixMaxMul, sumTransitions)
import Math.HiddenMarkovModel.Utility
(SquareMatrix, normalizeFactor, normalizeProb)
import qualified Numeric.LAPACK.Vector as Vector
import Numeric.LAPACK.Matrix ((<#), (#>))
import Numeric.LAPACK.Vector (Vector)
import qualified Numeric.Netlib.Class as Class
import qualified Control.Functor.HT as Functor
import qualified Data.Array.Comfort.Storable as StorableArray
import qualified Data.Array.Comfort.Boxed as Array
import qualified Data.Array.Comfort.Shape as Shape
import qualified Data.NonEmpty.Class as NonEmptyC
import qualified Data.NonEmpty as NonEmpty
import qualified Data.Foldable as Fold
import qualified Data.List as List
import Data.Traversable (Traversable, mapAccumL)
import Data.Tuple.HT (mapFst, mapSnd, swap)
logLikelihood ::
(Distr.EmissionProb distr, Distr.StateShape distr ~ sh, Eq sh, Floating prob,
Distr.Probability distr ~ prob, Distr.Emission distr ~ emission,
Traversable f) =>
T distr sh prob -> NonEmpty.T f emission -> prob
logLikelihood hmm = Fold.sum . fmap (log . fst) . alpha hmm
alpha ::
(Distr.EmissionProb distr, Distr.StateShape distr ~ sh, Eq sh,
Distr.Probability distr ~ prob, Distr.Emission distr ~ emission,
Traversable f) =>
T distr sh prob ->
NonEmpty.T f emission -> NonEmpty.T f (prob, Vector sh prob)
alpha hmm (NonEmpty.Cons x xs) =
let normMulEmiss y = normalizeFactor . Vector.mul (emission hmm y)
in NonEmpty.scanl
(\(_,alphai) xi -> normMulEmiss xi (transition hmm #> alphai))
(normMulEmiss x (initial hmm))
xs
beta ::
(Distr.EmissionProb distr, Distr.StateShape distr ~ sh, Eq sh,
Distr.Probability distr ~ prob, Distr.Emission distr ~ emission,
Traversable f, NonEmptyC.Reverse f) =>
T distr sh prob ->
f (prob, emission) -> NonEmpty.T f (Vector sh prob)
beta hmm =
nonEmptyScanr
(\(ci,xi) betai ->
Vector.scale (recip ci) $
Vector.mul (emission hmm xi) betai <# transition hmm)
(Vector.constant (StorableArray.shape $ initial hmm) 1)
alphaBeta ::
(Distr.EmissionProb distr, Distr.StateShape distr ~ sh, Eq sh,
Distr.Probability distr ~ prob, Distr.Emission distr ~ emission,
Traversable f, NonEmptyC.Zip f, NonEmptyC.Reverse f) =>
T distr sh prob ->
NonEmpty.T f emission ->
(NonEmpty.T f (prob, Vector sh prob), NonEmpty.T f (Vector sh prob))
alphaBeta hmm xs =
let calphas = alpha hmm xs
in (calphas,
beta hmm $ NonEmpty.tail $ NonEmptyC.zip (fmap fst calphas) xs)
xiFromAlphaBeta ::
(Distr.EmissionProb distr, Distr.StateShape distr ~ sh, Eq sh,
Distr.Probability distr ~ prob, Distr.Emission distr ~ emission,
Traversable f, NonEmptyC.Zip f) =>
T distr sh prob ->
NonEmpty.T f emission ->
NonEmpty.T f (prob, Vector sh prob) ->
NonEmpty.T f (Vector sh prob) ->
f (SquareMatrix sh prob)
xiFromAlphaBeta hmm xs calphas betas =
let (cs,alphas) = Functor.unzip calphas
in NonEmptyC.zipWith4
(\x alpha0 c1 beta1 ->
Vector.scale (recip c1) $ biscaleTransition hmm x alpha0 beta1)
(NonEmpty.tail xs)
(NonEmpty.init alphas)
(NonEmpty.tail cs)
(NonEmpty.tail betas)
zetaFromAlphaBeta ::
(Shape.C sh, Eq sh, Class.Real prob, NonEmptyC.Zip f) =>
NonEmpty.T f (prob, Vector sh prob) ->
NonEmpty.T f (Vector sh prob) ->
NonEmpty.T f (Vector sh prob)
zetaFromAlphaBeta calphas betas =
NonEmptyC.zipWith (Vector.mul . snd) calphas betas
reveal ::
(Distr.EmissionProb distr, Distr.StateShape distr ~ sh,
Shape.InvIndexed sh, Eq sh, Shape.Index sh ~ state,
Distr.Probability distr ~ prob, Distr.Emission distr ~ emission,
Traversable f, NonEmptyC.Reverse f) =>
T distr sh prob -> NonEmpty.T f emission -> NonEmpty.T f state
reveal hmm (NonEmpty.Cons x xs) =
fmap (Shape.revealIndex (StorableArray.shape $ initial hmm)) $
uncurry (NonEmpty.scanr (StorableArray.!)) $
mapFst
(fst . Vector.argAbsMaximum .
StorableArray.mapShape Shape.Deferred) $
mapAccumL
(\alphai xi ->
swap $ mapSnd (Vector.mul (emission hmm xi)) $
matrixMaxMul (transition hmm) $ normalizeProb alphai)
(Vector.mul (emission hmm x) (initial hmm)) xs
nonEmptyScanr ::
(Traversable f, NonEmptyC.Reverse f) =>
(a -> b -> b) -> b -> f a -> NonEmpty.T f b
nonEmptyScanr f x =
NonEmptyC.reverse . NonEmpty.scanl (flip f) x . NonEmptyC.reverse
trainUnsupervised ::
(Distr.Estimate tdistr distr, Distr.StateShape distr ~ sh, Eq sh,
Distr.Probability distr ~ prob, Distr.Emission distr ~ emission) =>
T distr sh prob -> NonEmpty.T [] emission -> Trained tdistr sh prob
trainUnsupervised hmm xs =
let (alphas, betas) = alphaBeta hmm xs
zetas = zetaFromAlphaBeta alphas betas
zeta0 = NonEmpty.head zetas
in Trained {
trainedInitial = zeta0,
trainedTransition =
sumTransitions hmm $ xiFromAlphaBeta hmm xs alphas betas,
trainedDistribution =
Distr.accumulateEmissions $
Array.fromList (StorableArray.shape zeta0) $
map (zip (NonEmpty.flatten xs)) $
List.transpose $ map Vector.toList $ NonEmpty.flatten zetas
}