20

I am learning about elastic search and I am following the next tutorial, but I get the next error

Exception in thread "main" java.lang.IllegalArgumentException: The number of object passed must be even but was [1]
at  org.elasticsearch.action.index.IndexRequest.source(IndexRequest.java:451)
at elastic.elasti.App.lambda$0(App.java:55)
at java.util.ArrayList.forEach(ArrayList.java:1249)
at elastic.elasti.App.indexExampleData(App.java:53)
at elastic.elasti.App.main(App.java:45)

Could you help me to fix it please?

public class App 
{
    public static void main( String[] args ) throws TwitterException, UnknownHostException
    {
    System.out.println( "Hello World!" );
    List tweetJsonList = searchForTweets();

    Client client = TransportClient.builder().build()
            .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9300));
    String index = "tweets_juan";
    client.admin().indices()
                    .create(new CreateIndexRequest(index))
                    .actionGet();
    indexExampleData(client, tweetJsonList, index);
    searchExample(client);
}
public static void indexExampleData(Client client, List tweetJsonList, String index) {


    BulkRequestBuilder bulkRequestBuilder = client.prepareBulk();

    tweetJsonList.forEach((jsonTweet) -> {
        bulkRequestBuilder.add(new IndexRequest(index, "tweets_juan")
                .source(jsonTweet));
    });

    BulkResponse bulkItemResponses = bulkRequestBuilder.get();
}




public static void searchExample(Client client) {
    BoolQueryBuilder queryBuilder = QueryBuilders
            .boolQuery()
            .must(termsQuery("text", "españa"));

    SearchResponse searchResponse = client.prepareSearch("tweets_juan")
            .setQuery(queryBuilder)
            .setSize(25)
            .execute()
            .actionGet();
     }

public static List searchForTweets() throws TwitterException {
    Twitter twitter = new TwitterFactory().getInstance();
    Query query = new Query("mundial baloncesto");
    List tweetList = new ArrayList<>();
    for (int i = 0; i < 10; i++) {
        QueryResult queryResult = twitter.search(query);
        tweetList.addAll(queryResult.getTweets());
        if (!queryResult.hasNext()) {
            break;
        }
        query = queryResult.nextQuery();
    }
    Gson gson = new Gson();

    return (List) tweetList.stream().map(gson::toJson).collect(Collectors.toList());
    }
}

4 Answers 4

38

I know it's late but the simple answer to this is adding XContentType.JSON along with source which is available in ElasticSearch library package org.elasticsearch.common.xcontent

public static void indexExampleData(Client client, List tweetJsonList, String index) {


    BulkRequestBuilder bulkRequestBuilder = client.prepareBulk();

    tweetJsonList.forEach((jsonTweet) -> {
        bulkRequestBuilder.add(new IndexRequest(index, "tweets_juan")
                .source(jsonTweet,XContentType.JSON));
    });

    BulkResponse bulkItemResponses = bulkRequestBuilder.get();
}
3
  • 3
    this should be the selected answer
    – senseiwu
    Commented Nov 27, 2018 at 9:49
  • Also noticed that XContentType.JSON should be the second parameter. Putting it as the first parameter does compile, but later yield the same error message.
    – tsh
    Commented Sep 10, 2020 at 3:07
  • There seems to be a bug in OpenSearch. Should reverse the parameters here: IndexRequest.class ... public IndexRequest source(Object... source) { return this.source(Requests.INDEX_CONTENT_TYPE, source); } Commented Sep 23, 2022 at 18:44
8

Summary:

  1. Json object cannot be used as a source for indexing
  2. Either Stringify your json by using something like Jackson or set the source as Map

Jackson:

String stringifiedJson = objectMapper.writeValueAsString(jsonObject)

1

If you use the org.json library to manage JSON content and encounter this error, you can solve it this way (setting the source as Map as skgemini proposed in his answer):

JSONObject dataAsJson = new JSONObject(dataAsJsonFormattedString);
HashMap<String, Object> dataAsMap = new HashMap<String, Object>(dataAsJson.toMap());
bulkRequestBuilder.add(new IndexRequest(index, "tweets_juan").source(dataAsMap, XContentType.JSON));
1

The problem is that elastic-search does not allow indexing the JSON object directly using the RequestIndexer API. So we need to pass the XContentType.JSON as 2nd parameter in source(sourceStr, XContentType.JSON) method.

Tested this with the following combinations:

  • Elasticsearch version: 7.3.2
  • FlinkElasticConnector: flink-connector-elasticsearch7
  • Scala version: 2.13.1
  • Java version: 11.0.4

Sample code:

def sinkToES(counted: DataStream[(Employee, String)], index: String, indexType: String) = {
    try {
      val esSinkBuilder = new ElasticsearchSink.Builder[(Employee, String)](
        httpHosts, new ElasticsearchSinkFunction[(Employee, String)] {
          def process(element: (Employee, String), ctx: RuntimeContext, indexer: RequestIndexer) {
            val tenant = element._1.asInstanceOf[Employee].tenantId
            val sourceStr = JsonUtils.toJson[Employee](element._1.asInstanceOf[Employee])
            indexer.add(Requests.indexRequest.index(tenant + "_" + index).`type`(indexType).id(element._2).source(sourceStr, XContentType.JSON))
          }
        }
      )
      counted.addSink(esSinkBuilder.build())
    } catch {
      case e: Exception => logger.info("@@@@@@@Exception from this sinkDataToES method:-->" + e.printStackTrace)
    }
  }

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Not the answer you're looking for? Browse other questions tagged or ask your own question.