首页 > spark 怎么对一个内容为一批Iterable[Double]的Rdd中的每一个Iterable[Double]做增值修改

spark 怎么对一个内容为一批Iterable[Double]的Rdd中的每一个Iterable[Double]做增值修改

小弟刚学spark,遇到个问题,求指教:

val rdd = ... // 内容是 Iterable[Double]

现在想对rdd的每一个Iterable[Double]中按某种方法插入某些元素,怎么做?

我写了个函数做插值,在masterlocal的时候没有问题,master使用多worker就错了,怎么写并发安全spark程序啊?

栗子:

L1: [(103, v3),(106, v6)]
L2: [(101, v1),(102, v2),(103, v3),(104, v4),(105, v5),(106, v6)]

就是从L1变到L2, 其中v3,v6是已知的值,v1,v2,v4,v5使用默认的0

更新:

case class Metric(name: String, value: Double, timestamp: Long)
implicit val metricReads = Json.format[Metric]

......
val input = sc.textFile("/data/xxxxxxx.txt") // 每行一个json结构
val parsed = input.map(Json.parse(_))  // Parse json 
val result = parsed.flatMap(record => metricReads.reads(record).asOpt) // 得到Metric的rdd
val grouped = result.groupBy(_.name) // 根据Metric的name字段做groupBy,得到RDD[(name, Iterable[Metric])]
val output = grouped.mapValues(XXX) 

因为每个Iterable[Metric]的个数都不相等,所以需要在mapValues中对每个Iterable[Metric]做插值,都插成360个点的集合,插值逻辑就是按照timestamp每隔10s插一个点。

xxxxxxx.txt

{"name": "foo", "value": 1.1, "timestamp": 1469682268}
{"name": "bar", "value": 1.2, "timestamp": 1469682276}
{"name": "foo", "value": 1.3, "timestamp": 1469682417}
{"name": "bar", "value": 1.4, "timestamp": 1469683205}
{"name": "foo", "value": 1.5, "timestamp": 1469683369}

groupBy之后:

[
    ("foo", [{"timestamp": 1469682268, "value": 1.1}, {"timestamp": 1469682417, "value": 1.3}, {"timestamp": 1469683369, "value": 1.5}]), 
    ("bar", [{"timestamp": 1469682276, "value": 1.4}, {"timestamp": 1469683205, "value": 1.5}])
]

最终就是要让foobar的点数相同,每隔10s插一个点,共360个,上面的时间都是2016-07-28 13:00:002016-07-28 14:00:00 之间的,这段时间内的每个0~10s间隔之内有值就不插,没值的话就用0。比如:1469682000~1469682010之间没有值,就用0补,1469682260~1469682270之间有值1.1,就用1.1


这样做是没法并发的,如果你要扩充rdd,可以把需要插入的数据也做成一个rdd,然后用spark提供的join、union、intersection等操作来处理两个rdd

参考程序,没有加json依赖,所以自己parse的

    def parseLine(line: String): (String, (Long, Double)) = {
      val PATTERN = """^\{\"name\": \"(.*)\", \"value\": (.*), \"timestamp\": (.*)\}$""".r
      val res = PATTERN.findFirstMatchIn(line)
      if (res.isEmpty) {
        throw new RuntimeException("Cannot parse line: " + line)
      }
      val m = res.get
      (m.group(1), (m.group(3).toLong, m.group(2).toDouble))
    }

    object Handler extends Serializable {
      def handle(entry: (String, Iterable[(Long, Double)])): Map[Long, Double] = {
        val set = entry._2.map(_._1 / 10).toSet
        var map = entry._2.toMap
        for (i <- 146971080 until 146971441) {
          if (!set.contains(i)) {
            map += (i.toLong * 10L -> 0.0)
          }
        }
        // Logic
        map
      }
    }

    sc.textFile("./data/tmp.json")
      .map(parseLine)
      .groupByKey()
      .foreach(entry => Handler.handle(entry))
【热门文章】
【热门文章】