Task-Oriented Dialog System

NLU: natural language understanding
DST: dialog state tracker
DPL: dialog policy learning
NLG: natural language generator

Pipeline dialog system was replaced by end-to-end methods, since neural networks methods become more and more popular.

Multi-domain Dialog System

Reinforcement Learning

A Teacher-Student Framework for Maintainable Dialog Manager

knowledge infusion

Multi-domain Dialogue State Tracking as Dynamic Knowledge Graph Enhanced Question Answering

NLU benchmark DataSet

Pipeline dialog system was replaced by end-to-end methods, since neural networks methods become more and more popular.

DSTC (dialog system tracking challenge)

The Dialog State Tracking Challenge series-A Review introduced the first shared testbed and evaluation metrics

DSTC1 used a corpus of dialogs with various systems that participated in the Spoken Dialog Challenge (SDC) (Black et al., 2010), provided by the Dialog Research Center at Carnegie Mellon University. In the SDC, telephone calls from real passengers of the Port Authority of Allegheny County, which runs city buses in Pittsburgh, were forwarded to dialog systems built by different research groups. The goal was to provide bus riders with bus timetable information. For example, a caller might want to find out the time of the next bus leaving from Downtown to the airport. In this domain, the goal of the user typically remains fixed for the duration of the dialog.

DSTC2 aimed to extend the results of DSTC1 to another domain, as well as broaden the scope to include user goal changes. This challenge relied on a corpus of dialogs in the restaurant search domain between paid participants (through Amazon Mechanical Turk) and various systems developed at Cambridge University (Young et al., 2014). The goal of the user is to find specific information such as price range or phone number about a restaurant that fulfills a number of constraints such as cuisine or neighborhood.

DSTC3 expanded the domain of DSTC2 to include new slots which do not occur in the training data. This simulates the crucial problem of adapting a dialog system to a new domain for which little dialog data is available, while data for a similar but different domain might already exist. DSTC3 used all the data from DSTC2 as training set, as well as a new set of dialogs (also collected by Cambridge University researchers (Jurcˇ ́ıcˇek et al., 2011)) on a broader tourist information domain, covering bars and cafes in addition to restaurants.

[DSTC7][2]

Reference [1]: The Dialog State Tracking Challenge Series: A Review [2]: http://workshop.colips.org/dstc7/workshop.html

MR task read/write parquet file

Parquet

Parquet project is published by Twitter and Cloudera, which is an open-source cloumnar storage format library for Apache Hadoop. Column storage have many priorities, for example, useless information can be throwed away and compression algo can be used for storage since uniform structure. You can get more information in Parquet Website and Cloudera blog . Here, no more desciption about what is Parquet, or how effective to store data.

Currently, parquet provides a state of the art columnar storage layer that can be taken advantage of by existing Hadoop frameworks, and can enable a new generation of Hadoop data processing architectures such as Impala, Drill, and parts of the Hive ‘Stinger’ initiative.

MR task read/write parquet file

write parquet file

We suppose to use the simplest mr task with no reducer.

1.set input path and input format

TextInputFormat.addInputPath(writeToParquetJob, inputPath);
writeToParquetJob.setInputFormatClass(TextInputFormat.class);
writeToParquetJob.setMapperClass(ParquetMapper.class);

2.define ParquetMapper

public static class ParquetMapper extends Mapper<LongWritable, Text, Void, Person> {
	public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    	String[] array = value.toString().split("\t");
    	if (array.length != 2) return;
     	Person person = new Person();
        person.setId(Long.parseLong(array[0]));
        person.setName(array[1]);
        context.write(null, person);
    }
}

3.set output format with thrift clas and output path

writeToParquetJob.setNumReduceTasks(0);
writeToParquetJob.setOutputFormatClass(ParquetThriftOutputFormat.class);
ParquetThriftOutputFormat.setCompression(writeToParquetJob, CompressionCodecName.GZIP);
ParquetThriftOutputFormat.setOutputPath(writeToParquetJob, parquetPath);
ParquetThriftOutputFormat.setThriftClass(writeToParquetJob, Person.class);

4.run mapreduce task

boolean ret = writeToParquetJob.waitForCompletion(true);

read parquet file

1.set input path and input format

MultipleInputs.addInputPath(readParquetJob, parquetPath, ParquetThriftInputFormat.class, ParquetReader.class);

2.define ParquetReader

public static class ParquetReader extends Mapper<Void, Person, LongWritable, Text> {
	public void map(Void key, Person value, Context context) throws IOException, InterruptedException {
    	context.write(new LongWritable(value.getId()), new Text(value.getName()));
    }
}

3.set output format and output path

readParquetJob.setNumReduceTasks(0);
readParquetJob.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(readParquetJob, outputPath);

4.run mapreduce task

boolean ret = readParquetJob.waitForCompletion(true);

In addition, thrift class Person is defined as follows:

struct Person {
    1:optional i64 id;
    2:optional string name;
}
NMF based on spark graphx

NMF code

NMF based on spark graphx is implemented as follows. I have already experimented it successfully. And then I will descirbe the structure and design of NMF algo on the other day.

/**
 * Graph NMF algorithm implementation.
 *
 * Implementation Idea based on Pregel
 *
 * During the iteration, design the direction of the message propagating
 * The initial iteration number is 1
 * When iteration number is odd, forward propagate
 * 	update matrix W and use matrix H to propagate
 * When iteration number is even, back propagate
 * 	update matrix H and use matrix W to propagate
 *
 * vertex attribute is [(Vector,Vector)], the first Vector is Vector W, and
 * the other is Vector H
 *
 * update Rule:
 * W_i <- (1-\theta*\lambda)*W_i + \theta sigma{ (d_ij - W_i*H_j) * H_j }
 * H_j <- (1-\theta*\lambda)*H_j + \theta sigma{ (d_ij - W_i*H_j) * W_i }
 *
 * `theta` is the step size
 * `lambda` is the normalized item
 *
 */
object ICTGraphNMF extends Logging with Serializable {
  /**
   * Run GraphNMF on fixed iteration algorithm returning a graph with
   * vertex attributes containing the two Vectors which is Vector W and Vector H
   * and edge attributes containing the edge weight.
   *
   * @tparam VD the original vertex attribute (not used)
   *
   * @param graph the graph on which to run NMF, the edge attribute must be Double
   * @param maxIteration the max iteration
   * @param theta the step size
   * @param lambda the normalization item
   * @param reducedDim the reduced Dimension in NMF algorithm
   *
   * @return the graph containing the two Vectors which is Vector W and Vector H
   * and edge attributes containing the edge weight.
   *
   */
  def run[VD: ClassTag](graph: Graph[VD, Double],
    maxIterations: Int = Int.MaxValue,
    theta: Double = 0.01,
    lambda: Double = 0.1,
    reducedDim: Int = 2) = {

    def forwardVertexProgram(id: VertexId, attri: (Vector, Vector), msgSum: Vector): (Vector, Vector) = {
      val scale = 1 - theta * lambda
      val intercept = theta * msgSum
      val newV = scale * attri._1 + intercept
      if (newV.elements.count(elem => elem < 0.0) == 0)
        (newV, attri._2)
      else {
        val newElementsNonZero = newV.elements.map(elem => if (elem > 0.0) elem else 0.0)
        (Vector(newElementsNonZero), attri._2)
      }
    }
    def backVertexProgram(id: VertexId, attri: (Vector, Vector), msgSum: Vector): (Vector, Vector) = {
      val scale = 1 - theta * lambda
      val intercept = theta * msgSum
      val newV = scale * attri._2 + intercept
      if (newV.elements.count(elem => elem < 0.0) == 0)
        (attri._1, newV)
      else {
        val newElementsNonZero = newV.elements.map(elem => if (elem > 0.0) elem else 0.0)
        (attri._1, Vector(newElementsNonZero))
      }
    }
    def forwardSendMessage(edge: EdgeTriplet[(Vector, Vector), Double]) = {
      Iterator((edge.srcId, (edge.attr - edge.srcAttr._1.dot(edge.dstAttr._2)) * edge.dstAttr._2))
    }
    def backSendMessage(edge: EdgeTriplet[(Vector, Vector), Double]) = {
      Iterator((edge.dstId, (edge.attr - edge.srcAttr._1.dot(edge.dstAttr._2)) * edge.srcAttr._1))
    }
    def messageCombiner(a: Vector, b: Vector): Vector = a + b

    // initiate each Vertex's vector W and vector H whose dimension is reducedDim on 
    var curGraph: Graph[(Vector, Vector), Double] = graph
      .mapVertices((vid, vdata) =>
        (Vector(Array.fill(reducedDim)(Random.nextDouble)),
          Vector(Array.fill(reducedDim)(Random.nextDouble)))).cache()

    //    var curGraph = nmfGraph //.cache()
    var messages = curGraph.mapReduceTriplets(forwardSendMessage, messageCombiner)
    var activeMessages = messages.count()
    var curIteration: Int = 1

    var prevGraph: Graph[(Vector, Vector), Double] = null
    while (activeMessages > 0 && (curIteration - 1) / 2 < maxIterations) {
      if ((curIteration - 1) % 2 == 0) {
        logDebug("Graph Information\n")
        logDebug(curGraph.vertices.collect().mkString("\n"))
        logDebug("GraphNMF interation:" + ((curIteration + 1) / 2).toString)
        logDebug("forward propagating\nprapagating messages:")
        logDebug(messages.collect().mkString("\n"))

        val newVerts: VertexRDD[(Vector, Vector)] = curGraph.vertices.innerJoin(messages)(forwardVertexProgram).cache()
        prevGraph = curGraph
        curGraph = curGraph.outerJoinVertices(newVerts) { (vid, old, newOpt) => newOpt.getOrElse(old) }
        curGraph.cache()

        val oldMessages = messages
        messages = curGraph.mapReduceTriplets(backSendMessage, messageCombiner).cache()
        oldMessages.unpersist(blocking = false)
        newVerts.unpersist(blocking = false)
      } else {
        logDebug("back propagating\nprapagating messages:")
        logDebug(messages.collect().mkString("\n"))

        val newVerts = curGraph.vertices.innerJoin(messages)(backVertexProgram).cache()
        prevGraph = curGraph
        curGraph = curGraph.outerJoinVertices(newVerts) { (vid, old, newOpt) => newOpt.getOrElse(old) }
        curGraph.cache()

        val oldMessages = messages
        messages = curGraph.mapReduceTriplets(forwardSendMessage, messageCombiner).cache()
        oldMessages.unpersist(blocking = false)
        newVerts.unpersist(blocking = false)
      }
      activeMessages = messages.count()

      prevGraph.unpersistVertices(blocking = false)
      prevGraph.edges.unpersist(blocking = false)
      curIteration += 1
    }
    curGraph
  }
  /**
   * Run GraphNMF on fixed iteration algorithm returning a graph with
   * vertex attributes containing the two Vectors which is Vector W and Vector H
   * and edge attributes containing the edge weight.
   *
   * @tparam VD the original vertex attribute (not used)
   *
   * @param graph the graph on which to run NMF, the edge attribute must be Double
   * @param maxIteration the max iteration
   * @param theta the step size
   * @param lambda the normalization item
   * @param reducedDim the reduced Dimension in NMF algorithm
   *
   * @return the graph containing the two Vectors which is Vector W and Vector H
   * and edge attributes containing the edge weight.
   *
   */
  def runWithZero[VD: ClassTag](graph: Graph[VD, Double],
    maxIterations: Int = Int.MaxValue,
    theta: Double = 0.01,
    lambda: Double = 0.1,
    reducedDim: Int = 2) = {
    //    val matrixWAccumulator = sc.accumulator(new Array[Double](reducedDim * reducedDim), "MatrixW")
    //    val matrixHAccumulator = sc.accumulator(new Array[Double](reducedDim * reducedDim), "MatrixH")

    var MatrixH = new Array[Double](reducedDim * reducedDim)
    var MatrixW = new Array[Double](reducedDim * reducedDim)

    /**
     * Wi * MatrixH (j) = Wi * Matrix(*,j)
     * 					= Wi * Array(i+reducedDim * j) i=0,...,reducedDim-1
     */
    def mutiplyVM(vec: Vector, arr: Array[Double]): Vector = {
      var result = new Array[Double](reducedDim)
      val vecElems = vec.elements
      for (i <- 0 to reducedDim - 1) {
        for (j <- 0 to reducedDim - 1) {
          result(i) += (vecElems(j) * arr(j + reducedDim * i))
        }
      }
      Vector(result)
    }
    def forwardVertexProgram(id: VertexId, attri: (Vector, Vector), msgSum: Vector): (Vector, Vector) = {
      val scale = 1 - theta * lambda
      val intercept = theta * (msgSum - mutiplyVM(attri._1, MatrixH))
      val newV = scale * attri._1 + intercept
      if (newV.elements.count(elem => elem < 0.0) == 0)
        (newV, attri._2)
      else {
        val newElementsNonZero = newV.elements.map(elem => if (elem > 0.0) elem else 0.0)
        (Vector(newElementsNonZero), attri._2)
      }
    }
    def backVertexProgram(id: VertexId, attri: (Vector, Vector), msgSum: Vector): (Vector, Vector) = {
      val scale = 1 - theta * lambda
      val intercept = theta * (msgSum - mutiplyVM(attri._2, MatrixW))
      val newV = scale * attri._2 + intercept
      if (newV.elements.count(elem => elem < 0.0) == 0)
        (attri._1, newV)
      else {
        val newElementsNonZero = newV.elements.map(elem => if (elem > 0.0) elem else 0.0)
        (attri._1, Vector(newElementsNonZero))
      }
    }
    def forwardSendMessage(edge: EdgeTriplet[(Vector, Vector), Double]) = {
      Iterator((edge.srcId, edge.dstAttr._2.multiply(edge.attr)))
    }
    def backSendMessage(edge: EdgeTriplet[(Vector, Vector), Double]) = {
      Iterator((edge.dstId, edge.srcAttr._1.multiply(edge.attr)))
    }
    def messageCombiner(a: Vector, b: Vector): Vector = a + b

    // initiate each Vertex's vector W and vector H whose dimension is reducedDim on 
    var curGraph: Graph[(Vector, Vector), Double] = graph
      .mapVertices((vid, vdata) =>
        (Vector(Array.fill(reducedDim)(Random.nextDouble)),
          Vector(Array.fill(reducedDim)(Random.nextDouble)))).cache()

    //    var curGraph = nmfGraph //.cache()
    var messages = curGraph.mapReduceTriplets(forwardSendMessage, messageCombiner)
    var activeMessages = messages.count()
    var curIteration: Int = 1

    var prevGraph: Graph[(Vector, Vector), Double] = null
    while (activeMessages > 0 && (curIteration - 1) / 2 < maxIterations) {
      if ((curIteration - 1) % 2 == 0) {
        logDebug("Graph Information\n")
        logDebug(curGraph.vertices.collect().mkString("\n"))
        logDebug("GraphNMF interation:" + ((curIteration + 1) / 2).toString)
        logDebug("forward propagating\nprapagating messages:")
        logDebug(messages.collect().mkString("\n"))

        MatrixH = new Array[Double](reducedDim * reducedDim)
        curGraph.vertices.sortBy(_._1, true).map(vertexElem => { //sortWith((VD1, VD2) => VD1._1 < VD2._1).
          // for each vertex compute Hj'*Hj and then add them together
          val h = vertexElem._2._2
          var i = 0
          var j = 0
          for (elemi <- h.elements) {
            i = 0
            for (elemj <- h.elements) {
              var k = i + reducedDim * j
              MatrixH(k) = MatrixH(k) + elemi * elemj
              i += 1
            }
            j += 1
          }
        })

        MatrixW = new Array[Double](reducedDim * reducedDim)
        curGraph.vertices.sortBy(_._1, true).map(vertexElem => {
          val w = vertexElem._2._1
          var i = 0
          var j = 0
          for (elemi <- w.elements) {
            i = 0
            for (elemj <- w.elements) {
              var k = i + reducedDim * j
              MatrixW(k) = MatrixW(k) + elemi * elemj
              i += 1
            }
            j += 1
          }
        })

        val newVerts: VertexRDD[(Vector, Vector)] = curGraph.vertices.innerJoin(messages)(forwardVertexProgram).cache()
        prevGraph = curGraph
        curGraph = curGraph.outerJoinVertices(newVerts) { (vid, old, newOpt) => newOpt.getOrElse(old) }
        curGraph.cache()

        val oldMessages = messages
        messages = curGraph.mapReduceTriplets(backSendMessage, messageCombiner).cache()
        oldMessages.unpersist(blocking = false)
        newVerts.unpersist(blocking = false)
      } else {
        logDebug("Graph Information\n")
        logDebug(curGraph.vertices.collect().mkString("\n"))
        logDebug("back propagating\nprapagating messages:")
        logDebug(messages.collect().mkString("\n"))

        val newVerts = curGraph.vertices.innerJoin(messages)(backVertexProgram).cache()
        prevGraph = curGraph
        curGraph = curGraph.outerJoinVertices(newVerts) { (vid, old, newOpt) => newOpt.getOrElse(old) }
        curGraph.cache()

        val oldMessages = messages
        messages = curGraph.mapReduceTriplets(forwardSendMessage, messageCombiner).cache()
        oldMessages.unpersist(blocking = false)
        newVerts.unpersist(blocking = false)
      }
      activeMessages = messages.count()

      prevGraph.unpersistVertices(blocking = false)
      prevGraph.edges.unpersist(blocking = false)
      curIteration += 1
    }
    curGraph
  }
}
Journey to The West

Java

  • Thinking in Java
  • Effective Java
  • Clean code
  • Refactoring Improving the Design of Existing Code

C++

  • Effective C++

Spark

  • Scala

python

  • Python for Data Analysis