java.lang.UnsupportedOperationExceptionfieldIndex on a Row without schema is undefined: Exception on...












6















The following code is throwing an Exception Caused by: java.lang.UnsupportedOperationException: fieldIndex on a Row without schema is undefined. This is happening when a on a dataframe that has been returned after a groupByKey and flatMap invocation on a dataframe using ExpressionEncoder, groupedByKey and a flatMap is invoked.



Logical flow:
originalDf->groupByKey->flatMap->groupByKey->flatMap->show



   import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{ IntegerType, StructField, StructType}

import scala.collection.mutable.ListBuffer



object Test {

def main(args: Array[String]): Unit = {

val values = List(List("1", "One") ,List("1", "Two") ,List("2", "Three"),List("2","4")).map(x =>(x(0), x(1)))
val session = SparkSession.builder.config("spark.master", "local").getOrCreate
import session.implicits._
val dataFrame = values.toDF


dataFrame.show()
dataFrame.printSchema()

val newSchema = StructType(dataFrame.schema.fields
++ Array(
StructField("Count", IntegerType, false)
)
)

val expr = RowEncoder.apply(newSchema)

val tranform = dataFrame.groupByKey(row => row.getAs[String]("_1")).flatMapGroups((key, inputItr) => {
val inputSeq = inputItr.toSeq

val length = inputSeq.size
var listBuff = new ListBuffer[Row]()
var counter : Int= 0
for(i <- 0 until(length))
{
counter+=1

}

for(i <- 0 until length ) {
var x = inputSeq(i)
listBuff += Row.fromSeq(x.toSeq ++ Array[Int](counter))
}
listBuff.iterator
})(expr)

tranform.show

val newSchema1 = StructType(tranform.schema.fields
++ Array(
StructField("Count1", IntegerType, false)
)
)
val expr1 = RowEncoder.apply(newSchema1)
val tranform2 = tranform.groupByKey(row => row.getAs[String]("_1")).flatMapGroups((key, inputItr) => {
val inputSeq = inputItr.toSeq

val length = inputSeq.size
var listBuff = new ListBuffer[Row]()
var counter : Int= 0
for(i <- 0 until(length))
{
counter+=1

}

for(i <- 0 until length ) {
var x = inputSeq(i)
listBuff += Row.fromSeq(x.toSeq ++ Array[Int](counter))
}
listBuff.iterator
})(expr1)

tranform2.show
}
}


Following is the stacktrace



18/11/21 19:39:03 WARN TaskSetManager: Lost task 144.0 in stage 11.0 (TID 400, localhost, executor driver): java.lang.UnsupportedOperationException: fieldIndex on a Row without schema is undefined.
at org.apache.spark.sql.Row$class.fieldIndex(Row.scala:342)
at org.apache.spark.sql.catalyst.expressions.GenericRow.fieldIndex(rows.scala:166)
at org.apache.spark.sql.Row$class.getAs(Row.scala:333)
at org.apache.spark.sql.catalyst.expressions.GenericRow.getAs(rows.scala:166)
at com.quantuting.sparkutils.main.Test$$anonfun$4.apply(Test.scala:59)
at com.quantuting.sparkutils.main.Test$$anonfun$4.apply(Test.scala:59)
at org.apache.spark.sql.execution.AppendColumnsWithObjectExec$$anonfun$9$$anonfun$apply$3.apply(objects.scala:300)
at org.apache.spark.sql.execution.AppendColumnsWithObjectExec$$anonfun$9$$anonfun$apply$3.apply(objects.scala:298)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)


How to fix this code?










share|improve this question




















  • 1





    @user6910411: added the stacktrace. Will be difficult to put the reproducible code, as the flow is integrated in a framework over multiple libraries. But can answer whatever details would be required

    – Bay Max
    Nov 20 '18 at 16:28











  • Can you post the case class definitions for the two datasets? Did you add the naturalRank field to the second?

    – sramalingam24
    Nov 20 '18 at 18:37











  • Also you can just do row => row.ticker if the schema is specified correctly

    – sramalingam24
    Nov 20 '18 at 19:23
















6















The following code is throwing an Exception Caused by: java.lang.UnsupportedOperationException: fieldIndex on a Row without schema is undefined. This is happening when a on a dataframe that has been returned after a groupByKey and flatMap invocation on a dataframe using ExpressionEncoder, groupedByKey and a flatMap is invoked.



Logical flow:
originalDf->groupByKey->flatMap->groupByKey->flatMap->show



   import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{ IntegerType, StructField, StructType}

import scala.collection.mutable.ListBuffer



object Test {

def main(args: Array[String]): Unit = {

val values = List(List("1", "One") ,List("1", "Two") ,List("2", "Three"),List("2","4")).map(x =>(x(0), x(1)))
val session = SparkSession.builder.config("spark.master", "local").getOrCreate
import session.implicits._
val dataFrame = values.toDF


dataFrame.show()
dataFrame.printSchema()

val newSchema = StructType(dataFrame.schema.fields
++ Array(
StructField("Count", IntegerType, false)
)
)

val expr = RowEncoder.apply(newSchema)

val tranform = dataFrame.groupByKey(row => row.getAs[String]("_1")).flatMapGroups((key, inputItr) => {
val inputSeq = inputItr.toSeq

val length = inputSeq.size
var listBuff = new ListBuffer[Row]()
var counter : Int= 0
for(i <- 0 until(length))
{
counter+=1

}

for(i <- 0 until length ) {
var x = inputSeq(i)
listBuff += Row.fromSeq(x.toSeq ++ Array[Int](counter))
}
listBuff.iterator
})(expr)

tranform.show

val newSchema1 = StructType(tranform.schema.fields
++ Array(
StructField("Count1", IntegerType, false)
)
)
val expr1 = RowEncoder.apply(newSchema1)
val tranform2 = tranform.groupByKey(row => row.getAs[String]("_1")).flatMapGroups((key, inputItr) => {
val inputSeq = inputItr.toSeq

val length = inputSeq.size
var listBuff = new ListBuffer[Row]()
var counter : Int= 0
for(i <- 0 until(length))
{
counter+=1

}

for(i <- 0 until length ) {
var x = inputSeq(i)
listBuff += Row.fromSeq(x.toSeq ++ Array[Int](counter))
}
listBuff.iterator
})(expr1)

tranform2.show
}
}


Following is the stacktrace



18/11/21 19:39:03 WARN TaskSetManager: Lost task 144.0 in stage 11.0 (TID 400, localhost, executor driver): java.lang.UnsupportedOperationException: fieldIndex on a Row without schema is undefined.
at org.apache.spark.sql.Row$class.fieldIndex(Row.scala:342)
at org.apache.spark.sql.catalyst.expressions.GenericRow.fieldIndex(rows.scala:166)
at org.apache.spark.sql.Row$class.getAs(Row.scala:333)
at org.apache.spark.sql.catalyst.expressions.GenericRow.getAs(rows.scala:166)
at com.quantuting.sparkutils.main.Test$$anonfun$4.apply(Test.scala:59)
at com.quantuting.sparkutils.main.Test$$anonfun$4.apply(Test.scala:59)
at org.apache.spark.sql.execution.AppendColumnsWithObjectExec$$anonfun$9$$anonfun$apply$3.apply(objects.scala:300)
at org.apache.spark.sql.execution.AppendColumnsWithObjectExec$$anonfun$9$$anonfun$apply$3.apply(objects.scala:298)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)


How to fix this code?










share|improve this question




















  • 1





    @user6910411: added the stacktrace. Will be difficult to put the reproducible code, as the flow is integrated in a framework over multiple libraries. But can answer whatever details would be required

    – Bay Max
    Nov 20 '18 at 16:28











  • Can you post the case class definitions for the two datasets? Did you add the naturalRank field to the second?

    – sramalingam24
    Nov 20 '18 at 18:37











  • Also you can just do row => row.ticker if the schema is specified correctly

    – sramalingam24
    Nov 20 '18 at 19:23














6












6








6








The following code is throwing an Exception Caused by: java.lang.UnsupportedOperationException: fieldIndex on a Row without schema is undefined. This is happening when a on a dataframe that has been returned after a groupByKey and flatMap invocation on a dataframe using ExpressionEncoder, groupedByKey and a flatMap is invoked.



Logical flow:
originalDf->groupByKey->flatMap->groupByKey->flatMap->show



   import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{ IntegerType, StructField, StructType}

import scala.collection.mutable.ListBuffer



object Test {

def main(args: Array[String]): Unit = {

val values = List(List("1", "One") ,List("1", "Two") ,List("2", "Three"),List("2","4")).map(x =>(x(0), x(1)))
val session = SparkSession.builder.config("spark.master", "local").getOrCreate
import session.implicits._
val dataFrame = values.toDF


dataFrame.show()
dataFrame.printSchema()

val newSchema = StructType(dataFrame.schema.fields
++ Array(
StructField("Count", IntegerType, false)
)
)

val expr = RowEncoder.apply(newSchema)

val tranform = dataFrame.groupByKey(row => row.getAs[String]("_1")).flatMapGroups((key, inputItr) => {
val inputSeq = inputItr.toSeq

val length = inputSeq.size
var listBuff = new ListBuffer[Row]()
var counter : Int= 0
for(i <- 0 until(length))
{
counter+=1

}

for(i <- 0 until length ) {
var x = inputSeq(i)
listBuff += Row.fromSeq(x.toSeq ++ Array[Int](counter))
}
listBuff.iterator
})(expr)

tranform.show

val newSchema1 = StructType(tranform.schema.fields
++ Array(
StructField("Count1", IntegerType, false)
)
)
val expr1 = RowEncoder.apply(newSchema1)
val tranform2 = tranform.groupByKey(row => row.getAs[String]("_1")).flatMapGroups((key, inputItr) => {
val inputSeq = inputItr.toSeq

val length = inputSeq.size
var listBuff = new ListBuffer[Row]()
var counter : Int= 0
for(i <- 0 until(length))
{
counter+=1

}

for(i <- 0 until length ) {
var x = inputSeq(i)
listBuff += Row.fromSeq(x.toSeq ++ Array[Int](counter))
}
listBuff.iterator
})(expr1)

tranform2.show
}
}


Following is the stacktrace



18/11/21 19:39:03 WARN TaskSetManager: Lost task 144.0 in stage 11.0 (TID 400, localhost, executor driver): java.lang.UnsupportedOperationException: fieldIndex on a Row without schema is undefined.
at org.apache.spark.sql.Row$class.fieldIndex(Row.scala:342)
at org.apache.spark.sql.catalyst.expressions.GenericRow.fieldIndex(rows.scala:166)
at org.apache.spark.sql.Row$class.getAs(Row.scala:333)
at org.apache.spark.sql.catalyst.expressions.GenericRow.getAs(rows.scala:166)
at com.quantuting.sparkutils.main.Test$$anonfun$4.apply(Test.scala:59)
at com.quantuting.sparkutils.main.Test$$anonfun$4.apply(Test.scala:59)
at org.apache.spark.sql.execution.AppendColumnsWithObjectExec$$anonfun$9$$anonfun$apply$3.apply(objects.scala:300)
at org.apache.spark.sql.execution.AppendColumnsWithObjectExec$$anonfun$9$$anonfun$apply$3.apply(objects.scala:298)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)


How to fix this code?










share|improve this question
















The following code is throwing an Exception Caused by: java.lang.UnsupportedOperationException: fieldIndex on a Row without schema is undefined. This is happening when a on a dataframe that has been returned after a groupByKey and flatMap invocation on a dataframe using ExpressionEncoder, groupedByKey and a flatMap is invoked.



Logical flow:
originalDf->groupByKey->flatMap->groupByKey->flatMap->show



   import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{ IntegerType, StructField, StructType}

import scala.collection.mutable.ListBuffer



object Test {

def main(args: Array[String]): Unit = {

val values = List(List("1", "One") ,List("1", "Two") ,List("2", "Three"),List("2","4")).map(x =>(x(0), x(1)))
val session = SparkSession.builder.config("spark.master", "local").getOrCreate
import session.implicits._
val dataFrame = values.toDF


dataFrame.show()
dataFrame.printSchema()

val newSchema = StructType(dataFrame.schema.fields
++ Array(
StructField("Count", IntegerType, false)
)
)

val expr = RowEncoder.apply(newSchema)

val tranform = dataFrame.groupByKey(row => row.getAs[String]("_1")).flatMapGroups((key, inputItr) => {
val inputSeq = inputItr.toSeq

val length = inputSeq.size
var listBuff = new ListBuffer[Row]()
var counter : Int= 0
for(i <- 0 until(length))
{
counter+=1

}

for(i <- 0 until length ) {
var x = inputSeq(i)
listBuff += Row.fromSeq(x.toSeq ++ Array[Int](counter))
}
listBuff.iterator
})(expr)

tranform.show

val newSchema1 = StructType(tranform.schema.fields
++ Array(
StructField("Count1", IntegerType, false)
)
)
val expr1 = RowEncoder.apply(newSchema1)
val tranform2 = tranform.groupByKey(row => row.getAs[String]("_1")).flatMapGroups((key, inputItr) => {
val inputSeq = inputItr.toSeq

val length = inputSeq.size
var listBuff = new ListBuffer[Row]()
var counter : Int= 0
for(i <- 0 until(length))
{
counter+=1

}

for(i <- 0 until length ) {
var x = inputSeq(i)
listBuff += Row.fromSeq(x.toSeq ++ Array[Int](counter))
}
listBuff.iterator
})(expr1)

tranform2.show
}
}


Following is the stacktrace



18/11/21 19:39:03 WARN TaskSetManager: Lost task 144.0 in stage 11.0 (TID 400, localhost, executor driver): java.lang.UnsupportedOperationException: fieldIndex on a Row without schema is undefined.
at org.apache.spark.sql.Row$class.fieldIndex(Row.scala:342)
at org.apache.spark.sql.catalyst.expressions.GenericRow.fieldIndex(rows.scala:166)
at org.apache.spark.sql.Row$class.getAs(Row.scala:333)
at org.apache.spark.sql.catalyst.expressions.GenericRow.getAs(rows.scala:166)
at com.quantuting.sparkutils.main.Test$$anonfun$4.apply(Test.scala:59)
at com.quantuting.sparkutils.main.Test$$anonfun$4.apply(Test.scala:59)
at org.apache.spark.sql.execution.AppendColumnsWithObjectExec$$anonfun$9$$anonfun$apply$3.apply(objects.scala:300)
at org.apache.spark.sql.execution.AppendColumnsWithObjectExec$$anonfun$9$$anonfun$apply$3.apply(objects.scala:298)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)


How to fix this code?







scala apache-spark






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Nov 21 '18 at 14:24







Bay Max

















asked Nov 20 '18 at 16:03









Bay MaxBay Max

9710




9710








  • 1





    @user6910411: added the stacktrace. Will be difficult to put the reproducible code, as the flow is integrated in a framework over multiple libraries. But can answer whatever details would be required

    – Bay Max
    Nov 20 '18 at 16:28











  • Can you post the case class definitions for the two datasets? Did you add the naturalRank field to the second?

    – sramalingam24
    Nov 20 '18 at 18:37











  • Also you can just do row => row.ticker if the schema is specified correctly

    – sramalingam24
    Nov 20 '18 at 19:23














  • 1





    @user6910411: added the stacktrace. Will be difficult to put the reproducible code, as the flow is integrated in a framework over multiple libraries. But can answer whatever details would be required

    – Bay Max
    Nov 20 '18 at 16:28











  • Can you post the case class definitions for the two datasets? Did you add the naturalRank field to the second?

    – sramalingam24
    Nov 20 '18 at 18:37











  • Also you can just do row => row.ticker if the schema is specified correctly

    – sramalingam24
    Nov 20 '18 at 19:23








1




1





@user6910411: added the stacktrace. Will be difficult to put the reproducible code, as the flow is integrated in a framework over multiple libraries. But can answer whatever details would be required

– Bay Max
Nov 20 '18 at 16:28





@user6910411: added the stacktrace. Will be difficult to put the reproducible code, as the flow is integrated in a framework over multiple libraries. But can answer whatever details would be required

– Bay Max
Nov 20 '18 at 16:28













Can you post the case class definitions for the two datasets? Did you add the naturalRank field to the second?

– sramalingam24
Nov 20 '18 at 18:37





Can you post the case class definitions for the two datasets? Did you add the naturalRank field to the second?

– sramalingam24
Nov 20 '18 at 18:37













Also you can just do row => row.ticker if the schema is specified correctly

– sramalingam24
Nov 20 '18 at 19:23





Also you can just do row => row.ticker if the schema is specified correctly

– sramalingam24
Nov 20 '18 at 19:23












2 Answers
2






active

oldest

votes


















4





+50









The reported problem could be avoided by replacing the fieldname version of getAs[T] method (used in the function for groupByKey):



groupByKey(row => row.getAs[String]("_1"))


with the field-position version:



groupByKey(row => row.getAs[String](fieldIndexMap("_1")))


where fieldIndexMap maps field names to their corresponding field indexes:



val fieldIndexMap = tranform.schema.fieldNames.zipWithIndex.toMap


As a side note, your function for flatMapGroups can be simplified into something like below:



val tranform2 = tranform.groupByKey(_.getAs[String](fieldIndexMap("_1"))).
flatMapGroups((key, inputItr) => {
val inputSeq = inputItr.toSeq
val length = inputSeq.size
inputSeq.map(r => Row.fromSeq(r.toSeq :+ length))
})(expr1)


The inconsistent behavior between applying the original groupByKey/flatMapGroups methods to "dataFrame" versus "tranform" is apparently related to how the methods handle a DataFrame versus a Dataset[Row].






share|improve this answer


























  • Accepting the answer after the expanded illustration. Also I have already raised a Spark bug yesterday issues.apache.org/jira/browse/SPARK-26436

    – Bay Max
    Dec 26 '18 at 18:19



















0














Solution as received from JIRA on Spark project: https://issues.apache.org/jira/browse/SPARK-26436



This issue is caused by how you create the row:



listBuff += Row.fromSeq(x.toSeq ++ Array[Int](counter))


Row.fromSeq creates a GenericRow and GenericRow's fieldIndex is not implemented because GenericRow doesn't have schema.



Changing the line to create GenericRowWithSchema can solve it:



listBuff += new GenericRowWithSchema((x.toSeq ++ Array[Int](counter)).toArray, newSchema)





share|improve this answer























    Your Answer






    StackExchange.ifUsing("editor", function () {
    StackExchange.using("externalEditor", function () {
    StackExchange.using("snippets", function () {
    StackExchange.snippets.init();
    });
    });
    }, "code-snippets");

    StackExchange.ready(function() {
    var channelOptions = {
    tags: "".split(" "),
    id: "1"
    };
    initTagRenderer("".split(" "), "".split(" "), channelOptions);

    StackExchange.using("externalEditor", function() {
    // Have to fire editor after snippets, if snippets enabled
    if (StackExchange.settings.snippets.snippetsEnabled) {
    StackExchange.using("snippets", function() {
    createEditor();
    });
    }
    else {
    createEditor();
    }
    });

    function createEditor() {
    StackExchange.prepareEditor({
    heartbeatType: 'answer',
    autoActivateHeartbeat: false,
    convertImagesToLinks: true,
    noModals: true,
    showLowRepImageUploadWarning: true,
    reputationToPostImages: 10,
    bindNavPrevention: true,
    postfix: "",
    imageUploader: {
    brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
    contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
    allowUrls: true
    },
    onDemand: true,
    discardSelector: ".discard-answer"
    ,immediatelyShowMarkdownHelp:true
    });


    }
    });














    draft saved

    draft discarded


















    StackExchange.ready(
    function () {
    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53396949%2fjava-lang-unsupportedoperationexceptionfieldindex-on-a-row-without-schema-is-und%23new-answer', 'question_page');
    }
    );

    Post as a guest















    Required, but never shown

























    2 Answers
    2






    active

    oldest

    votes








    2 Answers
    2






    active

    oldest

    votes









    active

    oldest

    votes






    active

    oldest

    votes









    4





    +50









    The reported problem could be avoided by replacing the fieldname version of getAs[T] method (used in the function for groupByKey):



    groupByKey(row => row.getAs[String]("_1"))


    with the field-position version:



    groupByKey(row => row.getAs[String](fieldIndexMap("_1")))


    where fieldIndexMap maps field names to their corresponding field indexes:



    val fieldIndexMap = tranform.schema.fieldNames.zipWithIndex.toMap


    As a side note, your function for flatMapGroups can be simplified into something like below:



    val tranform2 = tranform.groupByKey(_.getAs[String](fieldIndexMap("_1"))).
    flatMapGroups((key, inputItr) => {
    val inputSeq = inputItr.toSeq
    val length = inputSeq.size
    inputSeq.map(r => Row.fromSeq(r.toSeq :+ length))
    })(expr1)


    The inconsistent behavior between applying the original groupByKey/flatMapGroups methods to "dataFrame" versus "tranform" is apparently related to how the methods handle a DataFrame versus a Dataset[Row].






    share|improve this answer


























    • Accepting the answer after the expanded illustration. Also I have already raised a Spark bug yesterday issues.apache.org/jira/browse/SPARK-26436

      – Bay Max
      Dec 26 '18 at 18:19
















    4





    +50









    The reported problem could be avoided by replacing the fieldname version of getAs[T] method (used in the function for groupByKey):



    groupByKey(row => row.getAs[String]("_1"))


    with the field-position version:



    groupByKey(row => row.getAs[String](fieldIndexMap("_1")))


    where fieldIndexMap maps field names to their corresponding field indexes:



    val fieldIndexMap = tranform.schema.fieldNames.zipWithIndex.toMap


    As a side note, your function for flatMapGroups can be simplified into something like below:



    val tranform2 = tranform.groupByKey(_.getAs[String](fieldIndexMap("_1"))).
    flatMapGroups((key, inputItr) => {
    val inputSeq = inputItr.toSeq
    val length = inputSeq.size
    inputSeq.map(r => Row.fromSeq(r.toSeq :+ length))
    })(expr1)


    The inconsistent behavior between applying the original groupByKey/flatMapGroups methods to "dataFrame" versus "tranform" is apparently related to how the methods handle a DataFrame versus a Dataset[Row].






    share|improve this answer


























    • Accepting the answer after the expanded illustration. Also I have already raised a Spark bug yesterday issues.apache.org/jira/browse/SPARK-26436

      – Bay Max
      Dec 26 '18 at 18:19














    4





    +50







    4





    +50



    4




    +50





    The reported problem could be avoided by replacing the fieldname version of getAs[T] method (used in the function for groupByKey):



    groupByKey(row => row.getAs[String]("_1"))


    with the field-position version:



    groupByKey(row => row.getAs[String](fieldIndexMap("_1")))


    where fieldIndexMap maps field names to their corresponding field indexes:



    val fieldIndexMap = tranform.schema.fieldNames.zipWithIndex.toMap


    As a side note, your function for flatMapGroups can be simplified into something like below:



    val tranform2 = tranform.groupByKey(_.getAs[String](fieldIndexMap("_1"))).
    flatMapGroups((key, inputItr) => {
    val inputSeq = inputItr.toSeq
    val length = inputSeq.size
    inputSeq.map(r => Row.fromSeq(r.toSeq :+ length))
    })(expr1)


    The inconsistent behavior between applying the original groupByKey/flatMapGroups methods to "dataFrame" versus "tranform" is apparently related to how the methods handle a DataFrame versus a Dataset[Row].






    share|improve this answer















    The reported problem could be avoided by replacing the fieldname version of getAs[T] method (used in the function for groupByKey):



    groupByKey(row => row.getAs[String]("_1"))


    with the field-position version:



    groupByKey(row => row.getAs[String](fieldIndexMap("_1")))


    where fieldIndexMap maps field names to their corresponding field indexes:



    val fieldIndexMap = tranform.schema.fieldNames.zipWithIndex.toMap


    As a side note, your function for flatMapGroups can be simplified into something like below:



    val tranform2 = tranform.groupByKey(_.getAs[String](fieldIndexMap("_1"))).
    flatMapGroups((key, inputItr) => {
    val inputSeq = inputItr.toSeq
    val length = inputSeq.size
    inputSeq.map(r => Row.fromSeq(r.toSeq :+ length))
    })(expr1)


    The inconsistent behavior between applying the original groupByKey/flatMapGroups methods to "dataFrame" versus "tranform" is apparently related to how the methods handle a DataFrame versus a Dataset[Row].







    share|improve this answer














    share|improve this answer



    share|improve this answer








    edited Dec 26 '18 at 17:37

























    answered Dec 26 '18 at 1:52









    Leo CLeo C

    10.6k2618




    10.6k2618













    • Accepting the answer after the expanded illustration. Also I have already raised a Spark bug yesterday issues.apache.org/jira/browse/SPARK-26436

      – Bay Max
      Dec 26 '18 at 18:19



















    • Accepting the answer after the expanded illustration. Also I have already raised a Spark bug yesterday issues.apache.org/jira/browse/SPARK-26436

      – Bay Max
      Dec 26 '18 at 18:19

















    Accepting the answer after the expanded illustration. Also I have already raised a Spark bug yesterday issues.apache.org/jira/browse/SPARK-26436

    – Bay Max
    Dec 26 '18 at 18:19





    Accepting the answer after the expanded illustration. Also I have already raised a Spark bug yesterday issues.apache.org/jira/browse/SPARK-26436

    – Bay Max
    Dec 26 '18 at 18:19













    0














    Solution as received from JIRA on Spark project: https://issues.apache.org/jira/browse/SPARK-26436



    This issue is caused by how you create the row:



    listBuff += Row.fromSeq(x.toSeq ++ Array[Int](counter))


    Row.fromSeq creates a GenericRow and GenericRow's fieldIndex is not implemented because GenericRow doesn't have schema.



    Changing the line to create GenericRowWithSchema can solve it:



    listBuff += new GenericRowWithSchema((x.toSeq ++ Array[Int](counter)).toArray, newSchema)





    share|improve this answer




























      0














      Solution as received from JIRA on Spark project: https://issues.apache.org/jira/browse/SPARK-26436



      This issue is caused by how you create the row:



      listBuff += Row.fromSeq(x.toSeq ++ Array[Int](counter))


      Row.fromSeq creates a GenericRow and GenericRow's fieldIndex is not implemented because GenericRow doesn't have schema.



      Changing the line to create GenericRowWithSchema can solve it:



      listBuff += new GenericRowWithSchema((x.toSeq ++ Array[Int](counter)).toArray, newSchema)





      share|improve this answer


























        0












        0








        0







        Solution as received from JIRA on Spark project: https://issues.apache.org/jira/browse/SPARK-26436



        This issue is caused by how you create the row:



        listBuff += Row.fromSeq(x.toSeq ++ Array[Int](counter))


        Row.fromSeq creates a GenericRow and GenericRow's fieldIndex is not implemented because GenericRow doesn't have schema.



        Changing the line to create GenericRowWithSchema can solve it:



        listBuff += new GenericRowWithSchema((x.toSeq ++ Array[Int](counter)).toArray, newSchema)





        share|improve this answer













        Solution as received from JIRA on Spark project: https://issues.apache.org/jira/browse/SPARK-26436



        This issue is caused by how you create the row:



        listBuff += Row.fromSeq(x.toSeq ++ Array[Int](counter))


        Row.fromSeq creates a GenericRow and GenericRow's fieldIndex is not implemented because GenericRow doesn't have schema.



        Changing the line to create GenericRowWithSchema can solve it:



        listBuff += new GenericRowWithSchema((x.toSeq ++ Array[Int](counter)).toArray, newSchema)






        share|improve this answer












        share|improve this answer



        share|improve this answer










        answered Jan 20 at 3:57









        Bay MaxBay Max

        9710




        9710






























            draft saved

            draft discarded




















































            Thanks for contributing an answer to Stack Overflow!


            • Please be sure to answer the question. Provide details and share your research!

            But avoid



            • Asking for help, clarification, or responding to other answers.

            • Making statements based on opinion; back them up with references or personal experience.


            To learn more, see our tips on writing great answers.




            draft saved


            draft discarded














            StackExchange.ready(
            function () {
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53396949%2fjava-lang-unsupportedoperationexceptionfieldindex-on-a-row-without-schema-is-und%23new-answer', 'question_page');
            }
            );

            Post as a guest















            Required, but never shown





















































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown

































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown







            Popular posts from this blog

            MongoDB - Not Authorized To Execute Command

            How to fix TextFormField cause rebuild widget in Flutter

            in spring boot 2.1 many test slices are not allowed anymore due to multiple @BootstrapWith